[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-14 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r151019591
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---
@@ -852,6 +662,41 @@ private[spark] object RandomForest extends Logging {
   }
 
   /**
+   * Find the best split for a node.
+   *
+   * @param binAggregates Bin statistics.
+   * @return tuple for best split: (Split, information gain, prediction at 
node)
+   */
+  private[tree] def binsToBestSplit(
+  binAggregates: DTStatsAggregator,
+  splits: Array[Array[Split]],
+  featuresForNode: Option[Array[Int]],
+  node: LearningNode): (Split, ImpurityStats) = {
+val validFeatureSplits = 
getNonConstantFeatures(binAggregates.metadata, featuresForNode)
+// For each (feature, split), calculate the gain, and select the best 
(feature, split).
+val parentImpurityCalc = if (node.stats == null) None else 
Some(node.stats.impurityCalculator)
--- End diff --

I believe so, the nodes at the top level are created 
([RandomForest.scala:178](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala#L178))
 with 
[`LearningNode.emptyNode`](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala#L341),
 which sets `node.stats = null`.

I could change this to check node depth (via node index), but if we're 
planning on deprecating node indices in the future it might be best not to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-14 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r151017375
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.{CategoricalSplit, Split}
+import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Utility methods for choosing splits during local & distributed tree 
training. */
+private[impl] object SplitUtils {
+
+  /** Sorts ordered feature categories by label centroid, returning an 
ordered list of categories */
+  private def sortByCentroid(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int): List[Int] = {
+/* Each bin is one category (feature value).
+ * The bins are ordered based on centroidForCategories, and this 
ordering determines which
+ * splits are considered.  (With K categories, we consider K - 1 
possible splits.)
+ *
+ * centroidForCategories is a list: (category, centroid)
+ */
+val numCategories = binAggregates.metadata.numBins(featureIndex)
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+
+val centroidForCategories = Range(0, numCategories).map { featureValue 
=>
+  val categoryStats =
+binAggregates.getImpurityCalculator(nodeFeatureOffset, 
featureValue)
+  val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, 
categoryStats)
+  (featureValue, centroid)
+}
+// TODO(smurching): How to handle logging statements like these?
+// logDebug("Centroids for categorical variable: " + 
centroidForCategories.mkString(","))
+// bins sorted by centroids
+val categoriesSortedByCentroid = 
centroidForCategories.toList.sortBy(_._2).map(_._1)
+// logDebug("Sorted centroids for categorical variable = " +
+//   categoriesSortedByCentroid.mkString(","))
+categoriesSortedByCentroid
+  }
+
+  /**
+   * Find the best split for an unordered categorical feature at a single 
node.
+   *
+   * Algorithm:
+   *  - Considers all possible subsets (exponentially many)
+   *
+   * @param featureIndex  Global index of feature being split.
+   * @param featureIndexIdx Index of feature being split within subset of 
features for current node.
+   * @param featureSplits Array of splits for the current feature
+   * @param parentCalculator Optional: ImpurityCalculator containing 
impurity stats for current node
+   * @return  (best split, statistics for split)  If no valid split was 
found, the returned
+   *  ImpurityStats instance will be invalid (have member valid = 
false).
+   */
+  private[impl] def chooseUnorderedCategoricalSplit(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  featureSplits: Array[Split],
+  parentCalculator: Option[ImpurityCalculator] = None): (Split, 
ImpurityStats) = {
+// Unordered categorical feature
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+val numSplits = binAggregates.metadata.numSplits(featureIndex)
+var parentCalc = parentCalculator
+val (bestFeatureSplitIndex, bestFeatureGainStats) =
+  Range(0, numSplits).map { splitIndex =>
+val leftChildStats = 
binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex)
+val rightChildStats = binAggregates.getParentImpurityCalculator()
+  .subtract(leftChildStats)
+val gainAndImpurityStats = 
ImpurityUtils.calculateImpurityStats(parentCalc,
+  leftChildStats, rightChildStats, binAggregates.metadata)
+// Compute parent stats once, when considering first split for 
current feature
+if 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-14 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r151011913
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---
@@ -627,221 +621,37 @@ private[spark] object RandomForest extends Logging {
   }
 
   /**
-   * Calculate the impurity statistics for a given (feature, split) based 
upon left/right
-   * aggregates.
-   *
-   * @param stats the recycle impurity statistics for this feature's all 
splits,
-   *  only 'impurity' and 'impurityCalculator' are valid 
between each iteration
-   * @param leftImpurityCalculator left node aggregates for this (feature, 
split)
-   * @param rightImpurityCalculator right node aggregate for this 
(feature, split)
-   * @param metadata learning and dataset metadata for DecisionTree
-   * @return Impurity statistics for this (feature, split)
+   * Return a list of pairs (featureIndexIdx, featureIndex) where 
featureIndex is the global
+   * (across all trees) index of a feature and featureIndexIdx is the 
index of a feature within the
+   * list of features for a given node. Filters out constant features 
(features with 0 splits)
*/
-  private def calculateImpurityStats(
-  stats: ImpurityStats,
-  leftImpurityCalculator: ImpurityCalculator,
-  rightImpurityCalculator: ImpurityCalculator,
-  metadata: DecisionTreeMetadata): ImpurityStats = {
-
-val parentImpurityCalculator: ImpurityCalculator = if (stats == null) {
-  leftImpurityCalculator.copy.add(rightImpurityCalculator)
-} else {
-  stats.impurityCalculator
-}
-
-val impurity: Double = if (stats == null) {
-  parentImpurityCalculator.calculate()
-} else {
-  stats.impurity
-}
-
-val leftCount = leftImpurityCalculator.count
-val rightCount = rightImpurityCalculator.count
-
-val totalCount = leftCount + rightCount
-
-// If left child or right child doesn't satisfy minimum instances per 
node,
-// then this split is invalid, return invalid information gain stats.
-if ((leftCount < metadata.minInstancesPerNode) ||
-  (rightCount < metadata.minInstancesPerNode)) {
-  return 
ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator)
-}
-
-val leftImpurity = leftImpurityCalculator.calculate() // Note: This 
equals 0 if count = 0
-val rightImpurity = rightImpurityCalculator.calculate()
-
-val leftWeight = leftCount / totalCount.toDouble
-val rightWeight = rightCount / totalCount.toDouble
-
-val gain = impurity - leftWeight * leftImpurity - rightWeight * 
rightImpurity
-
-// if information gain doesn't satisfy minimum information gain,
-// then this split is invalid, return invalid information gain stats.
-if (gain < metadata.minInfoGain) {
-  return 
ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator)
+  private[impl] def getNonConstantFeatures(
+  metadata: DecisionTreeMetadata,
+  featuresForNode: Option[Array[Int]]): Seq[(Int, Int)] = {
+Range(0, metadata.numFeaturesPerNode).map { featureIndexIdx =>
--- End diff --

At some point when refactoring I was hitting errors caused by a stateful 
operation within a `map` over the output of this method (IIRC the result of the 
`map` was accessed repeatedly, causing the stateful operation to inadvertently 
be run multiple times).

However using `withFilter` and `view` now seems to work, I'll change it 
back :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-14 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r151011879
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
 ---
@@ -112,7 +113,7 @@ private[spark] object ImpurityStats {
* minimum number of instances per node.
*/
   def getInvalidImpurityStats(impurityCalculator: ImpurityCalculator): 
ImpurityStats = {
-new ImpurityStats(Double.MinValue, impurityCalculator.calculate(),
+new ImpurityStats(Double.MinValue, impurity = -1,
--- End diff --

I changed this to be -1 here since node impurity would eventually get set 
to -1 anyways when `LearningNodes` with invalid `ImpurityStats` were converted 
into decision tree leaf nodes (see 
[`LearningNode.toNode`](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala#L279))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150349566
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---
@@ -852,6 +662,41 @@ private[spark] object RandomForest extends Logging {
   }
 
   /**
+   * Find the best split for a node.
+   *
+   * @param binAggregates Bin statistics.
+   * @return tuple for best split: (Split, information gain, prediction at 
node)
+   */
+  private[tree] def binsToBestSplit(
+  binAggregates: DTStatsAggregator,
+  splits: Array[Array[Split]],
+  featuresForNode: Option[Array[Int]],
+  node: LearningNode): (Split, ImpurityStats) = {
+val validFeatureSplits = 
getNonConstantFeatures(binAggregates.metadata, featuresForNode)
+// For each (feature, split), calculate the gain, and select the best 
(feature, split).
+val parentImpurityCalc = if (node.stats == null) None else 
Some(node.stats.impurityCalculator)
--- End diff --

Note to check: Will node.stats == null for the top level for sure?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150159113
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.{CategoricalSplit, Split}
+import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Utility methods for choosing splits during local & distributed tree 
training. */
+private[impl] object SplitUtils {
+
+  /** Sorts ordered feature categories by label centroid, returning an 
ordered list of categories */
+  private def sortByCentroid(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int): List[Int] = {
+/* Each bin is one category (feature value).
+ * The bins are ordered based on centroidForCategories, and this 
ordering determines which
+ * splits are considered.  (With K categories, we consider K - 1 
possible splits.)
+ *
+ * centroidForCategories is a list: (category, centroid)
+ */
+val numCategories = binAggregates.metadata.numBins(featureIndex)
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+
+val centroidForCategories = Range(0, numCategories).map { featureValue 
=>
+  val categoryStats =
+binAggregates.getImpurityCalculator(nodeFeatureOffset, 
featureValue)
+  val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, 
categoryStats)
+  (featureValue, centroid)
+}
+// TODO(smurching): How to handle logging statements like these?
+// logDebug("Centroids for categorical variable: " + 
centroidForCategories.mkString(","))
+// bins sorted by centroids
+val categoriesSortedByCentroid = 
centroidForCategories.toList.sortBy(_._2).map(_._1)
+// logDebug("Sorted centroids for categorical variable = " +
+//   categoriesSortedByCentroid.mkString(","))
+categoriesSortedByCentroid
+  }
+
+  /**
+   * Find the best split for an unordered categorical feature at a single 
node.
+   *
+   * Algorithm:
+   *  - Considers all possible subsets (exponentially many)
+   *
+   * @param featureIndex  Global index of feature being split.
+   * @param featureIndexIdx Index of feature being split within subset of 
features for current node.
+   * @param featureSplits Array of splits for the current feature
+   * @param parentCalculator Optional: ImpurityCalculator containing 
impurity stats for current node
+   * @return  (best split, statistics for split)  If no valid split was 
found, the returned
+   *  ImpurityStats instance will be invalid (have member valid = 
false).
+   */
+  private[impl] def chooseUnorderedCategoricalSplit(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  featureSplits: Array[Split],
+  parentCalculator: Option[ImpurityCalculator] = None): (Split, 
ImpurityStats) = {
+// Unordered categorical feature
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+val numSplits = binAggregates.metadata.numSplits(featureIndex)
+var parentCalc = parentCalculator
+val (bestFeatureSplitIndex, bestFeatureGainStats) =
+  Range(0, numSplits).map { splitIndex =>
+val leftChildStats = 
binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex)
+val rightChildStats = binAggregates.getParentImpurityCalculator()
+  .subtract(leftChildStats)
+val gainAndImpurityStats = 
ImpurityUtils.calculateImpurityStats(parentCalc,
+  leftChildStats, rightChildStats, binAggregates.metadata)
+// Compute parent stats once, when considering first split for 
current feature
+if 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150159513
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.{CategoricalSplit, Split}
+import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Utility methods for choosing splits during local & distributed tree 
training. */
+private[impl] object SplitUtils {
+
+  /** Sorts ordered feature categories by label centroid, returning an 
ordered list of categories */
+  private def sortByCentroid(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int): List[Int] = {
+/* Each bin is one category (feature value).
+ * The bins are ordered based on centroidForCategories, and this 
ordering determines which
+ * splits are considered.  (With K categories, we consider K - 1 
possible splits.)
+ *
+ * centroidForCategories is a list: (category, centroid)
+ */
+val numCategories = binAggregates.metadata.numBins(featureIndex)
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+
+val centroidForCategories = Range(0, numCategories).map { featureValue 
=>
+  val categoryStats =
+binAggregates.getImpurityCalculator(nodeFeatureOffset, 
featureValue)
+  val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, 
categoryStats)
+  (featureValue, centroid)
+}
+// TODO(smurching): How to handle logging statements like these?
+// logDebug("Centroids for categorical variable: " + 
centroidForCategories.mkString(","))
+// bins sorted by centroids
+val categoriesSortedByCentroid = 
centroidForCategories.toList.sortBy(_._2).map(_._1)
+// logDebug("Sorted centroids for categorical variable = " +
+//   categoriesSortedByCentroid.mkString(","))
+categoriesSortedByCentroid
+  }
+
+  /**
+   * Find the best split for an unordered categorical feature at a single 
node.
+   *
+   * Algorithm:
+   *  - Considers all possible subsets (exponentially many)
+   *
+   * @param featureIndex  Global index of feature being split.
+   * @param featureIndexIdx Index of feature being split within subset of 
features for current node.
+   * @param featureSplits Array of splits for the current feature
+   * @param parentCalculator Optional: ImpurityCalculator containing 
impurity stats for current node
+   * @return  (best split, statistics for split)  If no valid split was 
found, the returned
+   *  ImpurityStats instance will be invalid (have member valid = 
false).
+   */
+  private[impl] def chooseUnorderedCategoricalSplit(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  featureSplits: Array[Split],
+  parentCalculator: Option[ImpurityCalculator] = None): (Split, 
ImpurityStats) = {
+// Unordered categorical feature
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+val numSplits = binAggregates.metadata.numSplits(featureIndex)
+var parentCalc = parentCalculator
+val (bestFeatureSplitIndex, bestFeatureGainStats) =
+  Range(0, numSplits).map { splitIndex =>
+val leftChildStats = 
binAggregates.getImpurityCalculator(nodeFeatureOffset, splitIndex)
+val rightChildStats = binAggregates.getParentImpurityCalculator()
+  .subtract(leftChildStats)
+val gainAndImpurityStats = 
ImpurityUtils.calculateImpurityStats(parentCalc,
+  leftChildStats, rightChildStats, binAggregates.metadata)
+// Compute parent stats once, when considering first split for 
current feature
+if 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150158027
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---
@@ -627,221 +621,37 @@ private[spark] object RandomForest extends Logging {
   }
 
   /**
-   * Calculate the impurity statistics for a given (feature, split) based 
upon left/right
-   * aggregates.
-   *
-   * @param stats the recycle impurity statistics for this feature's all 
splits,
-   *  only 'impurity' and 'impurityCalculator' are valid 
between each iteration
-   * @param leftImpurityCalculator left node aggregates for this (feature, 
split)
-   * @param rightImpurityCalculator right node aggregate for this 
(feature, split)
-   * @param metadata learning and dataset metadata for DecisionTree
-   * @return Impurity statistics for this (feature, split)
+   * Return a list of pairs (featureIndexIdx, featureIndex) where 
featureIndex is the global
+   * (across all trees) index of a feature and featureIndexIdx is the 
index of a feature within the
+   * list of features for a given node. Filters out constant features 
(features with 0 splits)
*/
-  private def calculateImpurityStats(
-  stats: ImpurityStats,
-  leftImpurityCalculator: ImpurityCalculator,
-  rightImpurityCalculator: ImpurityCalculator,
-  metadata: DecisionTreeMetadata): ImpurityStats = {
-
-val parentImpurityCalculator: ImpurityCalculator = if (stats == null) {
-  leftImpurityCalculator.copy.add(rightImpurityCalculator)
-} else {
-  stats.impurityCalculator
-}
-
-val impurity: Double = if (stats == null) {
-  parentImpurityCalculator.calculate()
-} else {
-  stats.impurity
-}
-
-val leftCount = leftImpurityCalculator.count
-val rightCount = rightImpurityCalculator.count
-
-val totalCount = leftCount + rightCount
-
-// If left child or right child doesn't satisfy minimum instances per 
node,
-// then this split is invalid, return invalid information gain stats.
-if ((leftCount < metadata.minInstancesPerNode) ||
-  (rightCount < metadata.minInstancesPerNode)) {
-  return 
ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator)
-}
-
-val leftImpurity = leftImpurityCalculator.calculate() // Note: This 
equals 0 if count = 0
-val rightImpurity = rightImpurityCalculator.calculate()
-
-val leftWeight = leftCount / totalCount.toDouble
-val rightWeight = rightCount / totalCount.toDouble
-
-val gain = impurity - leftWeight * leftImpurity - rightWeight * 
rightImpurity
-
-// if information gain doesn't satisfy minimum information gain,
-// then this split is invalid, return invalid information gain stats.
-if (gain < metadata.minInfoGain) {
-  return 
ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator)
+  private[impl] def getNonConstantFeatures(
+  metadata: DecisionTreeMetadata,
+  featuresForNode: Option[Array[Int]]): Seq[(Int, Int)] = {
+Range(0, metadata.numFeaturesPerNode).map { featureIndexIdx =>
--- End diff --

Was there a reason to remove the use of view and withFilter here?  With the 
output of this method going through further Seq operations, I would expect the 
previous implementation to be more efficient.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150160368
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.{CategoricalSplit, Split}
+import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Utility methods for choosing splits during local & distributed tree 
training. */
+private[impl] object SplitUtils {
+
+  /** Sorts ordered feature categories by label centroid, returning an 
ordered list of categories */
+  private def sortByCentroid(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int): List[Int] = {
+/* Each bin is one category (feature value).
+ * The bins are ordered based on centroidForCategories, and this 
ordering determines which
+ * splits are considered.  (With K categories, we consider K - 1 
possible splits.)
+ *
+ * centroidForCategories is a list: (category, centroid)
+ */
+val numCategories = binAggregates.metadata.numBins(featureIndex)
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+
+val centroidForCategories = Range(0, numCategories).map { featureValue 
=>
+  val categoryStats =
+binAggregates.getImpurityCalculator(nodeFeatureOffset, 
featureValue)
+  val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, 
categoryStats)
+  (featureValue, centroid)
+}
+// TODO(smurching): How to handle logging statements like these?
+// logDebug("Centroids for categorical variable: " + 
centroidForCategories.mkString(","))
+// bins sorted by centroids
+val categoriesSortedByCentroid = 
centroidForCategories.toList.sortBy(_._2).map(_._1)
+// logDebug("Sorted centroids for categorical variable = " +
+//   categoriesSortedByCentroid.mkString(","))
+categoriesSortedByCentroid
+  }
+
+  /**
+   * Find the best split for an unordered categorical feature at a single 
node.
+   *
+   * Algorithm:
+   *  - Considers all possible subsets (exponentially many)
+   *
+   * @param featureIndex  Global index of feature being split.
+   * @param featureIndexIdx Index of feature being split within subset of 
features for current node.
+   * @param featureSplits Array of splits for the current feature
+   * @param parentCalculator Optional: ImpurityCalculator containing 
impurity stats for current node
+   * @return  (best split, statistics for split)  If no valid split was 
found, the returned
+   *  ImpurityStats instance will be invalid (have member valid = 
false).
+   */
+  private[impl] def chooseUnorderedCategoricalSplit(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  featureSplits: Array[Split],
+  parentCalculator: Option[ImpurityCalculator] = None): (Split, 
ImpurityStats) = {
+// Unordered categorical feature
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+val numSplits = binAggregates.metadata.numSplits(featureIndex)
+var parentCalc = parentCalculator
--- End diff --

It'd be nice to calculate the parentCalc right away here, if needed.  That 
seems possible just by taking the first candidate split.  Then we could 
simplify calculateImpurityStats by not passing in parentCalc as an option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150352747
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
 ---
@@ -112,7 +113,7 @@ private[spark] object ImpurityStats {
* minimum number of instances per node.
*/
   def getInvalidImpurityStats(impurityCalculator: ImpurityCalculator): 
ImpurityStats = {
-new ImpurityStats(Double.MinValue, impurityCalculator.calculate(),
+new ImpurityStats(Double.MinValue, impurity = -1,
--- End diff --

Q: Why -1 here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150154413
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.Split
+
+/**
+ * Helpers for updating DTStatsAggregators during collection of sufficient 
stats for tree training.
+ */
+private[impl] object AggUpdateUtils {
+
+  /**
+   * Updates the parent node stats of the passed-in impurity aggregator 
with the labels
+   * corresponding to the feature values at indices [from, to).
+   * @param indices Array of row indices for feature values; indices(i) = 
row index of the ith
+   *feature value
+   */
+  private[impl] def updateParentImpurity(
+  statsAggregator: DTStatsAggregator,
+  indices: Array[Int],
+  from: Int,
+  to: Int,
+  instanceWeights: Array[Double],
+  labels: Array[Double]): Unit = {
+from.until(to).foreach { idx =>
+  val rowIndex = indices(idx)
+  val label = labels(rowIndex)
+  statsAggregator.updateParent(label, instanceWeights(rowIndex))
+}
+  }
+
+  /**
+   * Update aggregator for an (unordered feature, label) pair
+   * @param featureSplits Array of splits for the current feature
+   */
+  private[impl] def updateUnorderedFeature(
+  agg: DTStatsAggregator,
+  featureValue: Int,
+  label: Double,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  featureSplits: Array[Split],
+  instanceWeight: Double): Unit = {
+val leftNodeFeatureOffset = agg.getFeatureOffset(featureIndexIdx)
+// Each unordered split has a corresponding bin for impurity stats of 
data points that fall
+// onto the left side of the split. For each unordered split, update 
left-side bin if applicable
+// for the current data point.
+val numSplits = agg.metadata.numSplits(featureIndex)
+var splitIndex = 0
+while (splitIndex < numSplits) {
+  if (featureSplits(splitIndex).shouldGoLeft(featureValue, 
featureSplits)) {
+agg.featureUpdate(leftNodeFeatureOffset, splitIndex, label, 
instanceWeight)
+  }
+  splitIndex += 1
+}
+  }
+
+  /** Update aggregator for an (ordered feature, label) pair */
+  private[impl] def updateOrderedFeature(
+  agg: DTStatsAggregator,
+  featureValue: Int,
+  label: Double,
+  featureIndex: Int,
--- End diff --

featureIndex is not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-11-10 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r150309552
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/SplitUtils.scala ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.{CategoricalSplit, Split}
+import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Utility methods for choosing splits during local & distributed tree 
training. */
+private[impl] object SplitUtils {
+
+  /** Sorts ordered feature categories by label centroid, returning an 
ordered list of categories */
+  private def sortByCentroid(
+  binAggregates: DTStatsAggregator,
+  featureIndex: Int,
+  featureIndexIdx: Int): List[Int] = {
+/* Each bin is one category (feature value).
+ * The bins are ordered based on centroidForCategories, and this 
ordering determines which
+ * splits are considered.  (With K categories, we consider K - 1 
possible splits.)
+ *
+ * centroidForCategories is a list: (category, centroid)
+ */
+val numCategories = binAggregates.metadata.numBins(featureIndex)
+val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
+
+val centroidForCategories = Range(0, numCategories).map { featureValue 
=>
+  val categoryStats =
+binAggregates.getImpurityCalculator(nodeFeatureOffset, 
featureValue)
+  val centroid = ImpurityUtils.getCentroid(binAggregates.metadata, 
categoryStats)
+  (featureValue, centroid)
+}
+// TODO(smurching): How to handle logging statements like these?
--- End diff --

What's the issue?  You should be able to call logDebug if this object 
inherits from org.apache.spark.internal.Logging


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-26 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r147317401
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new TrainingInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-26 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r147307553
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-25 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r147036693
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-25 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146981434
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-24 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146735946
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-24 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146731101
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.ml.tree.{LearningNode, Split}
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Maintains intermediate state of data (columns) and tree during local 
tree training.
+ * Primary local tree training data structure; contains all information 
required to describe
+ * the state of the algorithm at any point during learning.??
+ *
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The "periphery" is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param instanceWeights Array of weights for each training example
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to the node activeNodes(i) 
are in the range
+ * [nodeOffsets(i)(0), nodeOffsets(i)(1)) .
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class TrainingInfo(
+columns: Array[FeatureVector],
+instanceWeights: Array[Double],
--- End diff --

Good call, I'll move `instanceWeights` outside `TrainingInfo`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-24 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146731083
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-24 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146731070
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.Split
+
+/**
+ * Helpers for updating DTStatsAggregators during collection of sufficient 
stats for tree training.
+ */
+private[impl] object AggUpdateUtils {
+
+  /**
+   * Updates the parent node stats of the passed-in impurity aggregator 
with the labels
+   * corresponding to the feature values at indices [from, to).
+   */
+  private[impl] def updateParentImpurity(
+  statsAggregator: DTStatsAggregator,
+  col: FeatureVector,
+  from: Int,
+  to: Int,
+  instanceWeights: Array[Double],
+  labels: Array[Double]): Unit = {
+from.until(to).foreach { idx =>
+  val rowIndex = col.indices(idx)
+  val label = labels(rowIndex)
+  statsAggregator.updateParent(label, instanceWeights(rowIndex))
+}
+  }
+
+  /**
+   * Update aggregator for an (unordered feature, label) pair
+   * @param splits Array of arrays of splits for each feature; splits(i) = 
splits for feature i.
+   */
+  private[impl] def updateUnorderedFeature(
+  agg: DTStatsAggregator,
+  featureValue: Int,
+  label: Double,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  splits: Array[Array[Split]],
--- End diff --

Good call, I'll make this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-24 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r146731072
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/FeatureVector.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Stores values for a single training data column (a single continuous or 
categorical feature).
+ *
+ * Values are currently stored in a dense representation only.
+ * TODO: Support sparse storage (to optimize deeper levels of the tree), 
and maybe compressed
+ *   storage (to optimize upper levels of the tree).
+ *
+ * TODO: Sort feature values to support more complicated splitting logic 
(e.g. considering every
+ *   possible continuous split instead of discretizing continuous 
features).
+ *
+ * NOTE: We could add sorting of feature values in this PR; the only 
changed required would be to
+ * sort feature values at construction-time. Sorting might improve 
locality during stats
+ * aggregation (we'd frequently update the same O(statsSize) array for a 
(feature, bin),
+ * instead of frequently updating for the same feature).
+ *
+ * @param featureArity  For categorical features, this gives the number of 
categories.
+ *  For continuous features, this should be set to 0.
+ * @param rowIndices Optional: rowIndices(i) is the row index of the ith 
feature value (values(i))
+ *   If unspecified, feature values are assumed to be 
ordered by row (i.e. values(i)
+ *   is a feature value from the ith row).
+ */
+private[impl] class FeatureVector(
+val featureIndex: Int,
+val featureArity: Int,
+val values: Array[Int],
+private val rowIndices: Option[Array[Int]])
+  extends Serializable {
+  // Associates feature values with training point rows. indices(i) = 
training point index
+  // (row index) of ith feature value
+  val indices = rowIndices.getOrElse(values.indices.toArray)
+
+  def isCategorical: Boolean = featureArity > 0
+
+  /** For debugging */
+  override def toString: String = {
+"  FeatureVector(" +
+  s"featureIndex: $featureIndex,\n" +
+  s"featureType: ${if (featureArity == 0) "Continuous" else 
"Categorical"},\n" +
+  s"featureArity: $featureArity,\n" +
+  s"values: ${values.mkString(", ")},\n" +
+  s"indices: ${indices.mkString(", ")},\n" +
+  "  )"
+  }
+
+  def deepCopy(): FeatureVector =
+new FeatureVector(featureIndex, featureArity, values.clone(), 
Some(indices.clone()))
+
+  override def equals(other: Any): Boolean = {
+other match {
+  case o: FeatureVector =>
+featureIndex == o.featureIndex && featureArity == o.featureArity &&
+  values.sameElements(o.values) && indices.sameElements(o.indices)
+  case _ => false
+}
+  }
+
+  /**
+   * Reorders the subset of feature values at indices [from, to) in the 
passed-in column
+   * according to the split information encoded in instanceBitVector 
(feature values for rows
+   * that split left appear before feature values for rows that split 
right).
+   *
+   * @param numLeftRows Number of rows on the left side of the split
+   * @param tempVals Destination buffer for reordered feature values
+   * @param tempIndices Destination buffer for row indices corresponding 
to reordered feature values
+   * @param instanceBitVector instanceBitVector(i) = true if the row for 
the ith feature
+   *  value splits right, false otherwise
+   */
+  private[ml] def updateForSplit(
+  from: Int,
+  to: Int,
+  numLeftRows: Int,
+  tempVals: Array[Int],
+  tempIndices: Array[Int],
+  instanceBitVector: BitSet): Unit = {
+
+// 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144787368
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144788264
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144790803
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.ml.tree.{LearningNode, Split}
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Maintains intermediate state of data (columns) and tree during local 
tree training.
+ * Primary local tree training data structure; contains all information 
required to describe
+ * the state of the algorithm at any point during learning.??
+ *
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The "periphery" is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param instanceWeights Array of weights for each training example
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to the node activeNodes(i) 
are in the range
+ * [nodeOffsets(i)(0), nodeOffsets(i)(1)) .
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class TrainingInfo(
+columns: Array[FeatureVector],
+instanceWeights: Array[Double],
+nodeOffsets: Array[(Int, Int)],
+activeNodes: Array[LearningNode]) extends Serializable {
+
+  // pre-allocated temporary buffers that we use to sort
+  // instances in left and right children during update
+  val tempVals: Array[Int] = new Array[Int](columns(0).values.length)
+  val tempIndices: Array[Int] = new Array[Int](columns(0).values.length)
+
+  /** For debugging */
+  override def toString: String = {
+"PartitionInfo(" +
+  "  columns: {\n" +
+  columns.mkString(",\n") +
+  "  },\n" +
+  s"  nodeOffsets: ${nodeOffsets.mkString(", ")},\n" +
+  s"  activeNodes: ${activeNodes.iterator.mkString(", ")},\n" +
+  ")\n"
+  }
+
+  /**
+   * Update columns and nodeOffsets for the next level of the tree.
+   *
+   * Update columns:
+   *   For each (previously) active node,
+   * Compute bitset indicating whether each training instance under 
the node splits left/right
+   * For each column,
+   *   Sort corresponding range of instances based on bitset.
+   * Update nodeOffsets, activeNodes:
+   *   Split offsets for nodes which split (which can be identified using 
the bitset).
+   *
+   * @return Updated partition info
+   */
+  def update(splits: Array[Array[Split]], newActiveNodes: 
Array[LearningNode]): TrainingInfo = {
+// Create buffers for storing our new arrays of node offsets & 
impurities
+val newNodeOffsets = new ArrayBuffer[(Int, Int)]()
+// Update (per-node) sorting of each column to account for creation of 
new nodes
+var nodeIdx = 0
+while (nodeIdx < activeNodes.length) {
+  val node = activeNodes(nodeIdx)
+  // Get new active node offsets from active nodes that were split
+  if (node.split.isDefined) {
+// Get split and FeatureVector corresponding to feature for split
+val split = node.split.get
+val col = columns(split.featureIndex)
+val (from, to) = nodeOffsets(nodeIdx)
+// Compute bitset indicating whether 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144786233
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/FeatureVector.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Stores values for a single training data column (a single continuous or 
categorical feature).
+ *
+ * Values are currently stored in a dense representation only.
+ * TODO: Support sparse storage (to optimize deeper levels of the tree), 
and maybe compressed
+ *   storage (to optimize upper levels of the tree).
+ *
+ * TODO: Sort feature values to support more complicated splitting logic 
(e.g. considering every
+ *   possible continuous split instead of discretizing continuous 
features).
+ *
+ * NOTE: We could add sorting of feature values in this PR; the only 
changed required would be to
+ * sort feature values at construction-time. Sorting might improve 
locality during stats
+ * aggregation (we'd frequently update the same O(statsSize) array for a 
(feature, bin),
+ * instead of frequently updating for the same feature).
+ *
+ * @param featureArity  For categorical features, this gives the number of 
categories.
+ *  For continuous features, this should be set to 0.
+ * @param rowIndices Optional: rowIndices(i) is the row index of the ith 
feature value (values(i))
+ *   If unspecified, feature values are assumed to be 
ordered by row (i.e. values(i)
+ *   is a feature value from the ith row).
+ */
+private[impl] class FeatureVector(
+val featureIndex: Int,
+val featureArity: Int,
+val values: Array[Int],
+private val rowIndices: Option[Array[Int]])
+  extends Serializable {
+  // Associates feature values with training point rows. indices(i) = 
training point index
+  // (row index) of ith feature value
+  val indices = rowIndices.getOrElse(values.indices.toArray)
+
+  def isCategorical: Boolean = featureArity > 0
+
+  /** For debugging */
+  override def toString: String = {
+"  FeatureVector(" +
+  s"featureIndex: $featureIndex,\n" +
+  s"featureType: ${if (featureArity == 0) "Continuous" else 
"Categorical"},\n" +
+  s"featureArity: $featureArity,\n" +
+  s"values: ${values.mkString(", ")},\n" +
+  s"indices: ${indices.mkString(", ")},\n" +
+  "  )"
+  }
+
+  def deepCopy(): FeatureVector =
+new FeatureVector(featureIndex, featureArity, values.clone(), 
Some(indices.clone()))
+
+  override def equals(other: Any): Boolean = {
+other match {
+  case o: FeatureVector =>
+featureIndex == o.featureIndex && featureArity == o.featureArity &&
+  values.sameElements(o.values) && indices.sameElements(o.indices)
+  case _ => false
+}
+  }
+
+  /**
+   * Reorders the subset of feature values at indices [from, to) in the 
passed-in column
+   * according to the split information encoded in instanceBitVector 
(feature values for rows
+   * that split left appear before feature values for rows that split 
right).
+   *
+   * @param numLeftRows Number of rows on the left side of the split
+   * @param tempVals Destination buffer for reordered feature values
+   * @param tempIndices Destination buffer for row indices corresponding 
to reordered feature values
+   * @param instanceBitVector instanceBitVector(i) = true if the row for 
the ith feature
+   *  value splits right, false otherwise
+   */
+  private[ml] def updateForSplit(
+  from: Int,
+  to: Int,
+  numLeftRows: Int,
+  tempVals: Array[Int],
+  tempIndices: Array[Int],
+  instanceBitVector: BitSet): Unit = {
+
+// 

[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144790384
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.Split
+
+/**
+ * Helpers for updating DTStatsAggregators during collection of sufficient 
stats for tree training.
+ */
+private[impl] object AggUpdateUtils {
+
+  /**
+   * Updates the parent node stats of the passed-in impurity aggregator 
with the labels
+   * corresponding to the feature values at indices [from, to).
+   */
+  private[impl] def updateParentImpurity(
+  statsAggregator: DTStatsAggregator,
+  col: FeatureVector,
+  from: Int,
+  to: Int,
+  instanceWeights: Array[Double],
+  labels: Array[Double]): Unit = {
+from.until(to).foreach { idx =>
+  val rowIndex = col.indices(idx)
+  val label = labels(rowIndex)
+  statsAggregator.updateParent(label, instanceWeights(rowIndex))
+}
+  }
+
+  /**
+   * Update aggregator for an (unordered feature, label) pair
+   * @param splits Array of arrays of splits for each feature; splits(i) = 
splits for feature i.
+   */
+  private[impl] def updateUnorderedFeature(
+  agg: DTStatsAggregator,
+  featureValue: Int,
+  label: Double,
+  featureIndex: Int,
+  featureIndexIdx: Int,
+  splits: Array[Array[Split]],
--- End diff --

You only need to pass in the `featureSplit: Array[Split]`, don't pass all 
splits for all features.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144789990
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/AggUpdateUtils.scala ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree.Split
+
+/**
+ * Helpers for updating DTStatsAggregators during collection of sufficient 
stats for tree training.
+ */
+private[impl] object AggUpdateUtils {
+
+  /**
+   * Updates the parent node stats of the passed-in impurity aggregator 
with the labels
+   * corresponding to the feature values at indices [from, to).
+   */
+  private[impl] def updateParentImpurity(
+  statsAggregator: DTStatsAggregator,
+  col: FeatureVector,
--- End diff --

Actually, `updateParentImpurity` has no relation with any feature column, 
but here you pass in the `feature` column only want to use the `indices` array, 
passing anyone feature column will be OK. But, this looks weird, maybe it can 
be better designed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-16 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r144789015
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/TrainingInfo.scala ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.ml.tree.{LearningNode, Split}
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Maintains intermediate state of data (columns) and tree during local 
tree training.
+ * Primary local tree training data structure; contains all information 
required to describe
+ * the state of the algorithm at any point during learning.??
+ *
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The "periphery" is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param instanceWeights Array of weights for each training example
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to the node activeNodes(i) 
are in the range
+ * [nodeOffsets(i)(0), nodeOffsets(i)(1)) .
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class TrainingInfo(
+columns: Array[FeatureVector],
+instanceWeights: Array[Double],
--- End diff --

The `instanceWeights` will never be updated in each iteration, so why put 
it in the `TrainingInfo` structure ? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org