[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r151019591 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -852,6 +662,41 @@ private[spark] object RandomForest extends Logging { } /** + * Find the best split for a node. + * + * @param binAggregates Bin statistics. + * @return tuple for best split: (Split, information gain, prediction at node) + */ + private[tree] def binsToBestSplit( + binAggregates: DTStatsAggregator, + splits: Array[Array[Split]], + featuresForNode: Option[Array[Int]], + node: LearningNode): (Split, ImpurityStats) = { +val validFeatureSplits = getNonConstantFeatures(binAggregates.metadata, featuresForNode) +// For each (feature, split), calculate the gain, and select the best (feature, split). +val parentImpurityCalc = if (node.stats == null) None else Some(node.stats.impurityCalculator) --- End diff -- I believe so, the nodes at the top level are created ([RandomForest.scala:178](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala#L178)) with [`LearningNode.emptyNode`](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala#L341), which sets `node.stats = null`. I could change this to check node depth (via node index), but if we're planning on deprecating node indices in the future it might be best not to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r151017375 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.{CategoricalSplit, Split} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Utility methods for choosing splits during local & distributed tree training. */ +private[impl] object SplitUtils { + + /** Sorts ordered feature categories by label centroid, returning an ordered list of categories */ + private def sortByCentroid( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int): List[Int] = { +/* Each bin is one category (feature value). + * The bins are ordered based on centroidForCategories, and this ordering determines which + * splits are considered. (With K categories, we consider K - 1 possible splits.) + * + * centroidForCategories is a list: (category, centroid) + */ +val numCategories = binAggregates.metadata.numBins(featureIndex) +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) + +val centroidForCategories = Range(0, numCategories).map { featureValue => + val categoryStats = +binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue) + val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, categoryStats) + (featureValue, centroid) +} +// TODO(smurching): How to handle logging statements like these? +// logDebug("Centroids for categorical variable: " + centroidForCategories.mkString(",")) +// bins sorted by centroids +val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2).map(_._1) +// logDebug("Sorted centroids for categorical variable = " + +// categoriesSortedByCentroid.mkString(",")) +categoriesSortedByCentroid + } + + /** + * Find the best split for an unordered categorical feature at a single node. + * + * Algorithm: + * - Considers all possible subsets (exponentially many) + * + * @param featureIndex Global index of feature being split. + * @param featureIndexIdx Index of feature being split within subset of features for current node. + * @param featureSplits Array of splits for the current feature + * @param parentCalculator Optional: ImpurityCalculator containing impurity stats for current node + * @return (best split, statistics for split) If no valid split was found, the returned + * ImpurityStats instance will be invalid (have member valid = false). + */ + private[impl] def chooseUnorderedCategoricalSplit( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int, + featureSplits: Array[Split], + parentCalculator: Option[ImpurityCalculator] = None): (Split, ImpurityStats) = { +// Unordered categorical feature +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) +val numSplits = binAggregates.metadata.numSplits(featureIndex) +var parentCalc = parentCalculator +val (bestFeatureSplitIndex, bestFeatureGainStats) = + Range(0, numSplits).map { splitIndex => +val leftChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex) +val rightChildStats = binAggregates.getParentImpurityCalculator() + .subtract(leftChildStats) +val gainAndImpurityStats = ImpurityUtils.calculateImpurityStats(parentCalc, + leftChildStats, rightChildStats, binAggregates.metadata) +// Compute parent stats once, when considering first split for current feature +if
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r151011913 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -627,221 +621,37 @@ private[spark] object RandomForest extends Logging { } /** - * Calculate the impurity statistics for a given (feature, split) based upon left/right - * aggregates. - * - * @param stats the recycle impurity statistics for this feature's all splits, - * only 'impurity' and 'impurityCalculator' are valid between each iteration - * @param leftImpurityCalculator left node aggregates for this (feature, split) - * @param rightImpurityCalculator right node aggregate for this (feature, split) - * @param metadata learning and dataset metadata for DecisionTree - * @return Impurity statistics for this (feature, split) + * Return a list of pairs (featureIndexIdx, featureIndex) where featureIndex is the global + * (across all trees) index of a feature and featureIndexIdx is the index of a feature within the + * list of features for a given node. Filters out constant features (features with 0 splits) */ - private def calculateImpurityStats( - stats: ImpurityStats, - leftImpurityCalculator: ImpurityCalculator, - rightImpurityCalculator: ImpurityCalculator, - metadata: DecisionTreeMetadata): ImpurityStats = { - -val parentImpurityCalculator: ImpurityCalculator = if (stats == null) { - leftImpurityCalculator.copy.add(rightImpurityCalculator) -} else { - stats.impurityCalculator -} - -val impurity: Double = if (stats == null) { - parentImpurityCalculator.calculate() -} else { - stats.impurity -} - -val leftCount = leftImpurityCalculator.count -val rightCount = rightImpurityCalculator.count - -val totalCount = leftCount + rightCount - -// If left child or right child doesn't satisfy minimum instances per node, -// then this split is invalid, return invalid information gain stats. -if ((leftCount < metadata.minInstancesPerNode) || - (rightCount < metadata.minInstancesPerNode)) { - return ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator) -} - -val leftImpurity = leftImpurityCalculator.calculate() // Note: This equals 0 if count = 0 -val rightImpurity = rightImpurityCalculator.calculate() - -val leftWeight = leftCount / totalCount.toDouble -val rightWeight = rightCount / totalCount.toDouble - -val gain = impurity - leftWeight * leftImpurity - rightWeight * rightImpurity - -// if information gain doesn't satisfy minimum information gain, -// then this split is invalid, return invalid information gain stats. -if (gain < metadata.minInfoGain) { - return ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator) + private[impl] def getNonConstantFeatures( + metadata: DecisionTreeMetadata, + featuresForNode: Option[Array[Int]]): Seq[(Int, Int)] = { +Range(0, metadata.numFeaturesPerNode).map { featureIndexIdx => --- End diff -- At some point when refactoring I was hitting errors caused by a stateful operation within a `map` over the output of this method (IIRC the result of the `map` was accessed repeatedly, causing the stateful operation to inadvertently be run multiple times). However using `withFilter` and `view` now seems to work, I'll change it back :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r151011879 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala --- @@ -112,7 +113,7 @@ private[spark] object ImpurityStats { * minimum number of instances per node. */ def getInvalidImpurityStats(impurityCalculator: ImpurityCalculator): ImpurityStats = { -new ImpurityStats(Double.MinValue, impurityCalculator.calculate(), +new ImpurityStats(Double.MinValue, impurity = -1, --- End diff -- I changed this to be -1 here since node impurity would eventually get set to -1 anyways when `LearningNodes` with invalid `ImpurityStats` were converted into decision tree leaf nodes (see [`LearningNode.toNode`](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala#L279)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150349566 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -852,6 +662,41 @@ private[spark] object RandomForest extends Logging { } /** + * Find the best split for a node. + * + * @param binAggregates Bin statistics. + * @return tuple for best split: (Split, information gain, prediction at node) + */ + private[tree] def binsToBestSplit( + binAggregates: DTStatsAggregator, + splits: Array[Array[Split]], + featuresForNode: Option[Array[Int]], + node: LearningNode): (Split, ImpurityStats) = { +val validFeatureSplits = getNonConstantFeatures(binAggregates.metadata, featuresForNode) +// For each (feature, split), calculate the gain, and select the best (feature, split). +val parentImpurityCalc = if (node.stats == null) None else Some(node.stats.impurityCalculator) --- End diff -- Note to check: Will node.stats == null for the top level for sure? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150159113 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.{CategoricalSplit, Split} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Utility methods for choosing splits during local & distributed tree training. */ +private[impl] object SplitUtils { + + /** Sorts ordered feature categories by label centroid, returning an ordered list of categories */ + private def sortByCentroid( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int): List[Int] = { +/* Each bin is one category (feature value). + * The bins are ordered based on centroidForCategories, and this ordering determines which + * splits are considered. (With K categories, we consider K - 1 possible splits.) + * + * centroidForCategories is a list: (category, centroid) + */ +val numCategories = binAggregates.metadata.numBins(featureIndex) +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) + +val centroidForCategories = Range(0, numCategories).map { featureValue => + val categoryStats = +binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue) + val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, categoryStats) + (featureValue, centroid) +} +// TODO(smurching): How to handle logging statements like these? +// logDebug("Centroids for categorical variable: " + centroidForCategories.mkString(",")) +// bins sorted by centroids +val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2).map(_._1) +// logDebug("Sorted centroids for categorical variable = " + +// categoriesSortedByCentroid.mkString(",")) +categoriesSortedByCentroid + } + + /** + * Find the best split for an unordered categorical feature at a single node. + * + * Algorithm: + * - Considers all possible subsets (exponentially many) + * + * @param featureIndex Global index of feature being split. + * @param featureIndexIdx Index of feature being split within subset of features for current node. + * @param featureSplits Array of splits for the current feature + * @param parentCalculator Optional: ImpurityCalculator containing impurity stats for current node + * @return (best split, statistics for split) If no valid split was found, the returned + * ImpurityStats instance will be invalid (have member valid = false). + */ + private[impl] def chooseUnorderedCategoricalSplit( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int, + featureSplits: Array[Split], + parentCalculator: Option[ImpurityCalculator] = None): (Split, ImpurityStats) = { +// Unordered categorical feature +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) +val numSplits = binAggregates.metadata.numSplits(featureIndex) +var parentCalc = parentCalculator +val (bestFeatureSplitIndex, bestFeatureGainStats) = + Range(0, numSplits).map { splitIndex => +val leftChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex) +val rightChildStats = binAggregates.getParentImpurityCalculator() + .subtract(leftChildStats) +val gainAndImpurityStats = ImpurityUtils.calculateImpurityStats(parentCalc, + leftChildStats, rightChildStats, binAggregates.metadata) +// Compute parent stats once, when considering first split for current feature +if
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150159513 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.{CategoricalSplit, Split} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Utility methods for choosing splits during local & distributed tree training. */ +private[impl] object SplitUtils { + + /** Sorts ordered feature categories by label centroid, returning an ordered list of categories */ + private def sortByCentroid( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int): List[Int] = { +/* Each bin is one category (feature value). + * The bins are ordered based on centroidForCategories, and this ordering determines which + * splits are considered. (With K categories, we consider K - 1 possible splits.) + * + * centroidForCategories is a list: (category, centroid) + */ +val numCategories = binAggregates.metadata.numBins(featureIndex) +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) + +val centroidForCategories = Range(0, numCategories).map { featureValue => + val categoryStats = +binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue) + val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, categoryStats) + (featureValue, centroid) +} +// TODO(smurching): How to handle logging statements like these? +// logDebug("Centroids for categorical variable: " + centroidForCategories.mkString(",")) +// bins sorted by centroids +val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2).map(_._1) +// logDebug("Sorted centroids for categorical variable = " + +// categoriesSortedByCentroid.mkString(",")) +categoriesSortedByCentroid + } + + /** + * Find the best split for an unordered categorical feature at a single node. + * + * Algorithm: + * - Considers all possible subsets (exponentially many) + * + * @param featureIndex Global index of feature being split. + * @param featureIndexIdx Index of feature being split within subset of features for current node. + * @param featureSplits Array of splits for the current feature + * @param parentCalculator Optional: ImpurityCalculator containing impurity stats for current node + * @return (best split, statistics for split) If no valid split was found, the returned + * ImpurityStats instance will be invalid (have member valid = false). + */ + private[impl] def chooseUnorderedCategoricalSplit( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int, + featureSplits: Array[Split], + parentCalculator: Option[ImpurityCalculator] = None): (Split, ImpurityStats) = { +// Unordered categorical feature +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) +val numSplits = binAggregates.metadata.numSplits(featureIndex) +var parentCalc = parentCalculator +val (bestFeatureSplitIndex, bestFeatureGainStats) = + Range(0, numSplits).map { splitIndex => +val leftChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex) +val rightChildStats = binAggregates.getParentImpurityCalculator() + .subtract(leftChildStats) +val gainAndImpurityStats = ImpurityUtils.calculateImpurityStats(parentCalc, + leftChildStats, rightChildStats, binAggregates.metadata) +// Compute parent stats once, when considering first split for current feature +if
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150158027 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -627,221 +621,37 @@ private[spark] object RandomForest extends Logging { } /** - * Calculate the impurity statistics for a given (feature, split) based upon left/right - * aggregates. - * - * @param stats the recycle impurity statistics for this feature's all splits, - * only 'impurity' and 'impurityCalculator' are valid between each iteration - * @param leftImpurityCalculator left node aggregates for this (feature, split) - * @param rightImpurityCalculator right node aggregate for this (feature, split) - * @param metadata learning and dataset metadata for DecisionTree - * @return Impurity statistics for this (feature, split) + * Return a list of pairs (featureIndexIdx, featureIndex) where featureIndex is the global + * (across all trees) index of a feature and featureIndexIdx is the index of a feature within the + * list of features for a given node. Filters out constant features (features with 0 splits) */ - private def calculateImpurityStats( - stats: ImpurityStats, - leftImpurityCalculator: ImpurityCalculator, - rightImpurityCalculator: ImpurityCalculator, - metadata: DecisionTreeMetadata): ImpurityStats = { - -val parentImpurityCalculator: ImpurityCalculator = if (stats == null) { - leftImpurityCalculator.copy.add(rightImpurityCalculator) -} else { - stats.impurityCalculator -} - -val impurity: Double = if (stats == null) { - parentImpurityCalculator.calculate() -} else { - stats.impurity -} - -val leftCount = leftImpurityCalculator.count -val rightCount = rightImpurityCalculator.count - -val totalCount = leftCount + rightCount - -// If left child or right child doesn't satisfy minimum instances per node, -// then this split is invalid, return invalid information gain stats. -if ((leftCount < metadata.minInstancesPerNode) || - (rightCount < metadata.minInstancesPerNode)) { - return ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator) -} - -val leftImpurity = leftImpurityCalculator.calculate() // Note: This equals 0 if count = 0 -val rightImpurity = rightImpurityCalculator.calculate() - -val leftWeight = leftCount / totalCount.toDouble -val rightWeight = rightCount / totalCount.toDouble - -val gain = impurity - leftWeight * leftImpurity - rightWeight * rightImpurity - -// if information gain doesn't satisfy minimum information gain, -// then this split is invalid, return invalid information gain stats. -if (gain < metadata.minInfoGain) { - return ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator) + private[impl] def getNonConstantFeatures( + metadata: DecisionTreeMetadata, + featuresForNode: Option[Array[Int]]): Seq[(Int, Int)] = { +Range(0, metadata.numFeaturesPerNode).map { featureIndexIdx => --- End diff -- Was there a reason to remove the use of view and withFilter here? With the output of this method going through further Seq operations, I would expect the previous implementation to be more efficient. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150160368 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.{CategoricalSplit, Split} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Utility methods for choosing splits during local & distributed tree training. */ +private[impl] object SplitUtils { + + /** Sorts ordered feature categories by label centroid, returning an ordered list of categories */ + private def sortByCentroid( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int): List[Int] = { +/* Each bin is one category (feature value). + * The bins are ordered based on centroidForCategories, and this ordering determines which + * splits are considered. (With K categories, we consider K - 1 possible splits.) + * + * centroidForCategories is a list: (category, centroid) + */ +val numCategories = binAggregates.metadata.numBins(featureIndex) +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) + +val centroidForCategories = Range(0, numCategories).map { featureValue => + val categoryStats = +binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue) + val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, categoryStats) + (featureValue, centroid) +} +// TODO(smurching): How to handle logging statements like these? +// logDebug("Centroids for categorical variable: " + centroidForCategories.mkString(",")) +// bins sorted by centroids +val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2).map(_._1) +// logDebug("Sorted centroids for categorical variable = " + +// categoriesSortedByCentroid.mkString(",")) +categoriesSortedByCentroid + } + + /** + * Find the best split for an unordered categorical feature at a single node. + * + * Algorithm: + * - Considers all possible subsets (exponentially many) + * + * @param featureIndex Global index of feature being split. + * @param featureIndexIdx Index of feature being split within subset of features for current node. + * @param featureSplits Array of splits for the current feature + * @param parentCalculator Optional: ImpurityCalculator containing impurity stats for current node + * @return (best split, statistics for split) If no valid split was found, the returned + * ImpurityStats instance will be invalid (have member valid = false). + */ + private[impl] def chooseUnorderedCategoricalSplit( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int, + featureSplits: Array[Split], + parentCalculator: Option[ImpurityCalculator] = None): (Split, ImpurityStats) = { +// Unordered categorical feature +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) +val numSplits = binAggregates.metadata.numSplits(featureIndex) +var parentCalc = parentCalculator --- End diff -- It'd be nice to calculate the parentCalc right away here, if needed. That seems possible just by taking the first candidate split. Then we could simplify calculateImpurityStats by not passing in parentCalc as an option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150352747 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala --- @@ -112,7 +113,7 @@ private[spark] object ImpurityStats { * minimum number of instances per node. */ def getInvalidImpurityStats(impurityCalculator: ImpurityCalculator): ImpurityStats = { -new ImpurityStats(Double.MinValue, impurityCalculator.calculate(), +new ImpurityStats(Double.MinValue, impurity = -1, --- End diff -- Q: Why -1 here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150154413 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.Split + +/** + * Helpers for updating DTStatsAggregators during collection of sufficient stats for tree training. + */ +private[impl] object AggUpdateUtils { + + /** + * Updates the parent node stats of the passed-in impurity aggregator with the labels + * corresponding to the feature values at indices [from, to). + * @param indices Array of row indices for feature values; indices(i) = row index of the ith + *feature value + */ + private[impl] def updateParentImpurity( + statsAggregator: DTStatsAggregator, + indices: Array[Int], + from: Int, + to: Int, + instanceWeights: Array[Double], + labels: Array[Double]): Unit = { +from.until(to).foreach { idx => + val rowIndex = indices(idx) + val label = labels(rowIndex) + statsAggregator.updateParent(label, instanceWeights(rowIndex)) +} + } + + /** + * Update aggregator for an (unordered feature, label) pair + * @param featureSplits Array of splits for the current feature + */ + private[impl] def updateUnorderedFeature( + agg: DTStatsAggregator, + featureValue: Int, + label: Double, + featureIndex: Int, + featureIndexIdx: Int, + featureSplits: Array[Split], + instanceWeight: Double): Unit = { +val leftNodeFeatureOffset = agg.getFeatureOffset(featureIndexIdx) +// Each unordered split has a corresponding bin for impurity stats of data points that fall +// onto the left side of the split. For each unordered split, update left-side bin if applicable +// for the current data point. +val numSplits = agg.metadata.numSplits(featureIndex) +var splitIndex = 0 +while (splitIndex < numSplits) { + if (featureSplits(splitIndex).shouldGoLeft(featureValue, featureSplits)) { +agg.featureUpdate(leftNodeFeatureOffset, splitIndex, label, instanceWeight) + } + splitIndex += 1 +} + } + + /** Update aggregator for an (ordered feature, label) pair */ + private[impl] def updateOrderedFeature( + agg: DTStatsAggregator, + featureValue: Int, + label: Double, + featureIndex: Int, --- End diff -- featureIndex is not used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r150309552 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.{CategoricalSplit, Split} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Utility methods for choosing splits during local & distributed tree training. */ +private[impl] object SplitUtils { + + /** Sorts ordered feature categories by label centroid, returning an ordered list of categories */ + private def sortByCentroid( + binAggregates: DTStatsAggregator, + featureIndex: Int, + featureIndexIdx: Int): List[Int] = { +/* Each bin is one category (feature value). + * The bins are ordered based on centroidForCategories, and this ordering determines which + * splits are considered. (With K categories, we consider K - 1 possible splits.) + * + * centroidForCategories is a list: (category, centroid) + */ +val numCategories = binAggregates.metadata.numBins(featureIndex) +val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx) + +val centroidForCategories = Range(0, numCategories).map { featureValue => + val categoryStats = +binAggregates.getImpurityCalculator(nodeFeatureOffset, featureValue) + val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, categoryStats) + (featureValue, centroid) +} +// TODO(smurching): How to handle logging statements like these? --- End diff -- What's the issue? You should be able to call logDebug if this object inherits from org.apache.spark.internal.Logging --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r147317401 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new TrainingInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r147307553 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r147036693 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146981434 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146735946 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146731101 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.ml.tree.{LearningNode, Split} +import org.apache.spark.util.collection.BitSet + +/** + * Maintains intermediate state of data (columns) and tree during local tree training. + * Primary local tree training data structure; contains all information required to describe + * the state of the algorithm at any point during learning.?? + * + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The "periphery" is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param instanceWeights Array of weights for each training example + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to the node activeNodes(i) are in the range + * [nodeOffsets(i)(0), nodeOffsets(i)(1)) . + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class TrainingInfo( +columns: Array[FeatureVector], +instanceWeights: Array[Double], --- End diff -- Good call, I'll move `instanceWeights` outside `TrainingInfo` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146731083 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146731070 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.Split + +/** + * Helpers for updating DTStatsAggregators during collection of sufficient stats for tree training. + */ +private[impl] object AggUpdateUtils { + + /** + * Updates the parent node stats of the passed-in impurity aggregator with the labels + * corresponding to the feature values at indices [from, to). + */ + private[impl] def updateParentImpurity( + statsAggregator: DTStatsAggregator, + col: FeatureVector, + from: Int, + to: Int, + instanceWeights: Array[Double], + labels: Array[Double]): Unit = { +from.until(to).foreach { idx => + val rowIndex = col.indices(idx) + val label = labels(rowIndex) + statsAggregator.updateParent(label, instanceWeights(rowIndex)) +} + } + + /** + * Update aggregator for an (unordered feature, label) pair + * @param splits Array of arrays of splits for each feature; splits(i) = splits for feature i. + */ + private[impl] def updateUnorderedFeature( + agg: DTStatsAggregator, + featureValue: Int, + label: Double, + featureIndex: Int, + featureIndexIdx: Int, + splits: Array[Array[Split]], --- End diff -- Good call, I'll make this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r146731072 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/FeatureVector.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.util.collection.BitSet + +/** + * Stores values for a single training data column (a single continuous or categorical feature). + * + * Values are currently stored in a dense representation only. + * TODO: Support sparse storage (to optimize deeper levels of the tree), and maybe compressed + * storage (to optimize upper levels of the tree). + * + * TODO: Sort feature values to support more complicated splitting logic (e.g. considering every + * possible continuous split instead of discretizing continuous features). + * + * NOTE: We could add sorting of feature values in this PR; the only changed required would be to + * sort feature values at construction-time. Sorting might improve locality during stats + * aggregation (we'd frequently update the same O(statsSize) array for a (feature, bin), + * instead of frequently updating for the same feature). + * + * @param featureArity For categorical features, this gives the number of categories. + * For continuous features, this should be set to 0. + * @param rowIndices Optional: rowIndices(i) is the row index of the ith feature value (values(i)) + * If unspecified, feature values are assumed to be ordered by row (i.e. values(i) + * is a feature value from the ith row). + */ +private[impl] class FeatureVector( +val featureIndex: Int, +val featureArity: Int, +val values: Array[Int], +private val rowIndices: Option[Array[Int]]) + extends Serializable { + // Associates feature values with training point rows. indices(i) = training point index + // (row index) of ith feature value + val indices = rowIndices.getOrElse(values.indices.toArray) + + def isCategorical: Boolean = featureArity > 0 + + /** For debugging */ + override def toString: String = { +" FeatureVector(" + + s"featureIndex: $featureIndex,\n" + + s"featureType: ${if (featureArity == 0) "Continuous" else "Categorical"},\n" + + s"featureArity: $featureArity,\n" + + s"values: ${values.mkString(", ")},\n" + + s"indices: ${indices.mkString(", ")},\n" + + " )" + } + + def deepCopy(): FeatureVector = +new FeatureVector(featureIndex, featureArity, values.clone(), Some(indices.clone())) + + override def equals(other: Any): Boolean = { +other match { + case o: FeatureVector => +featureIndex == o.featureIndex && featureArity == o.featureArity && + values.sameElements(o.values) && indices.sameElements(o.indices) + case _ => false +} + } + + /** + * Reorders the subset of feature values at indices [from, to) in the passed-in column + * according to the split information encoded in instanceBitVector (feature values for rows + * that split left appear before feature values for rows that split right). + * + * @param numLeftRows Number of rows on the left side of the split + * @param tempVals Destination buffer for reordered feature values + * @param tempIndices Destination buffer for row indices corresponding to reordered feature values + * @param instanceBitVector instanceBitVector(i) = true if the row for the ith feature + * value splits right, false otherwise + */ + private[ml] def updateForSplit( + from: Int, + to: Int, + numLeftRows: Int, + tempVals: Array[Int], + tempIndices: Array[Int], + instanceBitVector: BitSet): Unit = { + +//
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144787368 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144788264 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144790803 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.ml.tree.{LearningNode, Split} +import org.apache.spark.util.collection.BitSet + +/** + * Maintains intermediate state of data (columns) and tree during local tree training. + * Primary local tree training data structure; contains all information required to describe + * the state of the algorithm at any point during learning.?? + * + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The "periphery" is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param instanceWeights Array of weights for each training example + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to the node activeNodes(i) are in the range + * [nodeOffsets(i)(0), nodeOffsets(i)(1)) . + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class TrainingInfo( +columns: Array[FeatureVector], +instanceWeights: Array[Double], +nodeOffsets: Array[(Int, Int)], +activeNodes: Array[LearningNode]) extends Serializable { + + // pre-allocated temporary buffers that we use to sort + // instances in left and right children during update + val tempVals: Array[Int] = new Array[Int](columns(0).values.length) + val tempIndices: Array[Int] = new Array[Int](columns(0).values.length) + + /** For debugging */ + override def toString: String = { +"PartitionInfo(" + + " columns: {\n" + + columns.mkString(",\n") + + " },\n" + + s" nodeOffsets: ${nodeOffsets.mkString(", ")},\n" + + s" activeNodes: ${activeNodes.iterator.mkString(", ")},\n" + + ")\n" + } + + /** + * Update columns and nodeOffsets for the next level of the tree. + * + * Update columns: + * For each (previously) active node, + * Compute bitset indicating whether each training instance under the node splits left/right + * For each column, + * Sort corresponding range of instances based on bitset. + * Update nodeOffsets, activeNodes: + * Split offsets for nodes which split (which can be identified using the bitset). + * + * @return Updated partition info + */ + def update(splits: Array[Array[Split]], newActiveNodes: Array[LearningNode]): TrainingInfo = { +// Create buffers for storing our new arrays of node offsets & impurities +val newNodeOffsets = new ArrayBuffer[(Int, Int)]() +// Update (per-node) sorting of each column to account for creation of new nodes +var nodeIdx = 0 +while (nodeIdx < activeNodes.length) { + val node = activeNodes(nodeIdx) + // Get new active node offsets from active nodes that were split + if (node.split.isDefined) { +// Get split and FeatureVector corresponding to feature for split +val split = node.split.get +val col = columns(split.featureIndex) +val (from, to) = nodeOffsets(nodeIdx) +// Compute bitset indicating whether
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144786233 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/FeatureVector.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.util.collection.BitSet + +/** + * Stores values for a single training data column (a single continuous or categorical feature). + * + * Values are currently stored in a dense representation only. + * TODO: Support sparse storage (to optimize deeper levels of the tree), and maybe compressed + * storage (to optimize upper levels of the tree). + * + * TODO: Sort feature values to support more complicated splitting logic (e.g. considering every + * possible continuous split instead of discretizing continuous features). + * + * NOTE: We could add sorting of feature values in this PR; the only changed required would be to + * sort feature values at construction-time. Sorting might improve locality during stats + * aggregation (we'd frequently update the same O(statsSize) array for a (feature, bin), + * instead of frequently updating for the same feature). + * + * @param featureArity For categorical features, this gives the number of categories. + * For continuous features, this should be set to 0. + * @param rowIndices Optional: rowIndices(i) is the row index of the ith feature value (values(i)) + * If unspecified, feature values are assumed to be ordered by row (i.e. values(i) + * is a feature value from the ith row). + */ +private[impl] class FeatureVector( +val featureIndex: Int, +val featureArity: Int, +val values: Array[Int], +private val rowIndices: Option[Array[Int]]) + extends Serializable { + // Associates feature values with training point rows. indices(i) = training point index + // (row index) of ith feature value + val indices = rowIndices.getOrElse(values.indices.toArray) + + def isCategorical: Boolean = featureArity > 0 + + /** For debugging */ + override def toString: String = { +" FeatureVector(" + + s"featureIndex: $featureIndex,\n" + + s"featureType: ${if (featureArity == 0) "Continuous" else "Categorical"},\n" + + s"featureArity: $featureArity,\n" + + s"values: ${values.mkString(", ")},\n" + + s"indices: ${indices.mkString(", ")},\n" + + " )" + } + + def deepCopy(): FeatureVector = +new FeatureVector(featureIndex, featureArity, values.clone(), Some(indices.clone())) + + override def equals(other: Any): Boolean = { +other match { + case o: FeatureVector => +featureIndex == o.featureIndex && featureArity == o.featureArity && + values.sameElements(o.values) && indices.sameElements(o.indices) + case _ => false +} + } + + /** + * Reorders the subset of feature values at indices [from, to) in the passed-in column + * according to the split information encoded in instanceBitVector (feature values for rows + * that split left appear before feature values for rows that split right). + * + * @param numLeftRows Number of rows on the left side of the split + * @param tempVals Destination buffer for reordered feature values + * @param tempIndices Destination buffer for row indices corresponding to reordered feature values + * @param instanceBitVector instanceBitVector(i) = true if the row for the ith feature + * value splits right, false otherwise + */ + private[ml] def updateForSplit( + from: Int, + to: Int, + numLeftRows: Int, + tempVals: Array[Int], + tempIndices: Array[Int], + instanceBitVector: BitSet): Unit = { + +//
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144790384 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.Split + +/** + * Helpers for updating DTStatsAggregators during collection of sufficient stats for tree training. + */ +private[impl] object AggUpdateUtils { + + /** + * Updates the parent node stats of the passed-in impurity aggregator with the labels + * corresponding to the feature values at indices [from, to). + */ + private[impl] def updateParentImpurity( + statsAggregator: DTStatsAggregator, + col: FeatureVector, + from: Int, + to: Int, + instanceWeights: Array[Double], + labels: Array[Double]): Unit = { +from.until(to).foreach { idx => + val rowIndex = col.indices(idx) + val label = labels(rowIndex) + statsAggregator.updateParent(label, instanceWeights(rowIndex)) +} + } + + /** + * Update aggregator for an (unordered feature, label) pair + * @param splits Array of arrays of splits for each feature; splits(i) = splits for feature i. + */ + private[impl] def updateUnorderedFeature( + agg: DTStatsAggregator, + featureValue: Int, + label: Double, + featureIndex: Int, + featureIndexIdx: Int, + splits: Array[Array[Split]], --- End diff -- You only need to pass in the `featureSplit: Array[Split]`, don't pass all splits for all features. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144789990 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree.Split + +/** + * Helpers for updating DTStatsAggregators during collection of sufficient stats for tree training. + */ +private[impl] object AggUpdateUtils { + + /** + * Updates the parent node stats of the passed-in impurity aggregator with the labels + * corresponding to the feature values at indices [from, to). + */ + private[impl] def updateParentImpurity( + statsAggregator: DTStatsAggregator, + col: FeatureVector, --- End diff -- Actually, `updateParentImpurity` has no relation with any feature column, but here you pass in the `feature` column only want to use the `indices` array, passing anyone feature column will be OK. But, this looks weird, maybe it can be better designed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r144789015 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.ml.tree.{LearningNode, Split} +import org.apache.spark.util.collection.BitSet + +/** + * Maintains intermediate state of data (columns) and tree during local tree training. + * Primary local tree training data structure; contains all information required to describe + * the state of the algorithm at any point during learning.?? + * + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The "periphery" is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param instanceWeights Array of weights for each training example + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to the node activeNodes(i) are in the range + * [nodeOffsets(i)(0), nodeOffsets(i)(1)) . + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class TrainingInfo( +columns: Array[FeatureVector], +instanceWeights: Array[Double], --- End diff -- The `instanceWeights` will never be updated in each iteration, so why put it in the `TrainingInfo` structure ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org