[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2017-01-23 Thread smurching
Github user smurching closed the pull request at:

https://github.com/apache/spark/pull/14872


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-09-27 Thread smurching
GitHub user smurching reopened a pull request:

https://github.com/apache/spark/pull/14872

[SPARK-3162][MLlib][WIP] Add local tree training for decision tree 
regressors

## What changes were proposed in this pull request?

Based on [Yggdrasil](https://github.com/fabuzaid21/yggdrasil), added local 
training of decision tree regressors.

Some classes/objects largely correspond to Yggdrasil classes/objects.
Specifically:
* class LocalDecisionTreeRegressor --> class YggdrasilRegressor
* object LocalDecisionTree --> object YggdrasilRegression
* object LocalDecisionTreeUtils --> object Yggdrasil

## How was this patch tested?

Added unit tests in (ml/tree/impl/LocalTreeTrainingSuite.scala) verifying 
that local & distributed training of a decision tree regressor produces the 
same tree.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurching/spark local-trees-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14872.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14872


commit acf5b3e29a346a0cb86f621269855a6a98a9a74e
Author: Siddharth Murching 
Date:   2016-08-29T23:51:33Z

Add local tree training for decision tree regressors

commit aa4fcc8d401385f38fe0cdfdb9fe39062c3a9f96
Author: Siddharth Murching 
Date:   2016-08-30T01:19:07Z

Fix setting of impurity values for leaf nodes to match values produced by
distributed Random Forest algorithm

commit f273fc6a4b5048ae577d03676def354dce5c87a7
Author: Siddharth Murching 
Date:   2016-08-31T07:01:26Z

WIP refactoring single-machine tree code

commit 5e61e3b29c236d27e0d655d15a48f2fe3e13d26a
Author: Siddharth Murching 
Date:   2016-09-01T01:22:48Z

Remove unused imports, remove array of single-node impurity aggregators

commit d2060fc460a97228a36bf81956cf8dd24c83106e
Author: Siddharth Murching 
Date:   2016-09-01T23:11:17Z

WIP

commit 634a3223374608d68018daac5500a429034bbc20
Author: Siddharth Murching 
Date:   2016-09-02T00:21:10Z

More work, tests still pass

commit eb7fde00e0db5aa5d04951f8f4a9cd62204f1609
Author: Siddharth Murching 
Date:   2016-09-02T17:02:06Z

WIP: Added tests for classes upon which local tree training is dependent. 
Some integration tests fail

commit b748f05e3eaa7d58b1ad86d269e0dda5f35ee885
Author: Siddharth Murching 
Date:   2016-09-02T17:37:31Z

WIP debugging

commit 297052242727e6693ccbacf89f44b3ff6db584f7
Author: Siddharth Murching 
Date:   2016-09-02T21:34:13Z

Consolidate checking for valid splits

commit 8d443ce38f958e7b83b502e614e01c824cb63c4b
Author: Siddharth Murching 
Date:   2016-09-02T21:52:47Z

Delete empty test suite

commit ee56ffe98756ed78cefbc3f782a471f04e80b256
Author: Siddharth Murching 
Date:   2016-09-02T22:49:38Z

Fix some style errors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-09-02 Thread smurching
Github user smurching closed the pull request at:

https://github.com/apache/spark/pull/14872


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853720
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Intermediate data stored during learning.
+ * TODO(smurching): Rename; maybe TrainingInfo?
+ *
+ * Node indexing for nodeOffsets, activeNodes:
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The periphery is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to node i are in the range
+ * [nodeOffsets(i), nodeOffsets(i+1)).
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class PartitionInfo(
+columns: Array[FeatureVector],
+nodeOffsets: Array[Int],
+activeNodes: BitSet,
+fullImpurityAggs: Array[ImpurityAggregatorSingle]) extends 
Serializable {
+
+  // pre-allocated temporary buffers that we use to sort
+  // instances in left and right children during update
+  val tempVals: Array[Int] = new Array[Int](columns(0).values.length)
+  val tempIndices: Array[Int] = new Array[Int](columns(0).values.length)
+
+  /** For debugging */
+  override def toString: String = {
+"PartitionInfo(" +
+  "  columns: {\n" +
+  columns.mkString(",\n") +
+  "  },\n" +
+  s"  nodeOffsets: ${nodeOffsets.mkString(", ")},\n" +
+  s"  activeNodes: ${activeNodes.iterator.mkString(", ")},\n" +
+  ")\n"
+  }
+
+  /**
+   * Update columns and nodeOffsets for the next level of the tree.
+   *
+   * Update columns:
+   *   For each column,
+   * For each (previously) active node,
+   *   Sort corresponding range of instances based on bit vector.
+   * Update nodeOffsets, activeNodes:
+   *   Split offsets for nodes which split (which can be identified using 
the bit vector).
+   *
+   * @param instanceBitVector  Bit vector encoding splits for the next 
level of the tree.
+   *These must follow a 2-level ordering, where the 
first level is by node
+   *and the second level is by row index.
+   *bitVector(i) = false iff instance i goes to the 
left child.
+   *For instances at inactive (leaf) nodes, the value 
can be arbitrary.
+   * @return Updated partition info
+   */
+  def update(
+  instanceBitVector: BitSet,
+  newNumNodeOffsets: Int,
+  labels: Array[Byte],
+  metadata: DecisionTreeMetadata): PartitionInfo = {
+// Create a 2-level representation of the new nodeOffsets (to be 
flattened).
+// These 2 levels correspond to original nodes and their children (if 
split).
+val newNodeOffsets = nodeOffsets.map(Array(_))
+val newFullImpurityAggs = fullImpurityAggs.map(Array(_))
+
+val newColumns = columns.zipWithIndex.map { case (col, index) =>
+  index match {
+case 0 => first(col, instanceBitVector, metadata,
+  labels, newNodeOffsets, newFullImpurityAggs)
+case _ => rest(col, instanceBitVector, newNodeOffsets)
+  }
+  

[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853460
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Intermediate data stored during learning.
+ * TODO(smurching): Rename; maybe TrainingInfo?
+ *
+ * Node indexing for nodeOffsets, activeNodes:
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The periphery is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to node i are in the range
+ * [nodeOffsets(i), nodeOffsets(i+1)).
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class PartitionInfo(
+columns: Array[FeatureVector],
+nodeOffsets: Array[Int],
+activeNodes: BitSet,
+fullImpurityAggs: Array[ImpurityAggregatorSingle]) extends 
Serializable {
--- End diff --

Don't store this.  Just compute it when first choosing a split for a node, 
and reuse those stats as the fullImpurityAggs for all features for that node.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853452
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Intermediate data stored during learning.
+ * TODO(smurching): Rename; maybe TrainingInfo?
+ *
+ * Node indexing for nodeOffsets, activeNodes:
+ * Nodes are indexed left-to-right along the periphery of the tree, with 
0-based indices.
+ * The periphery is the set of leaf nodes (active and inactive).
+ *
+ * @param columns  Array of columns.
+ * Each column is sorted first by nodes (left-to-right 
along the tree periphery);
+ * all columns share this first level of sorting.
+ * Within each node's group, each column is sorted based 
on feature value;
+ * this second level of sorting differs across columns.
+ * @param nodeOffsets  Offsets into the columns indicating the first level 
of sorting (by node).
+ * The rows corresponding to node i are in the range
+ * [nodeOffsets(i), nodeOffsets(i+1)).
+ * @param activeNodes  Nodes which are active (still being split).
+ * Inactive nodes are known to be leaves in the final 
tree.
+ */
+private[impl] case class PartitionInfo(
+columns: Array[FeatureVector],
+nodeOffsets: Array[Int],
+activeNodes: BitSet,
--- End diff --

not needed; use actual nodes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853370
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+import org.apache.spark.util.collection.BitSet
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset.
+   * TODO(smurching): Accept a seed for feature subsampling
+   *
+   * @param node LearningNode to split
+   */
+  def fitNode(
+  input: Array[BaggedPoint[TreePoint]],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  val impurityAggregator: ImpurityAggregatorSingle =
+input.aggregate(metadata.createImpurityAggregator())(
+  (agg, lp) => agg.update(lp.datum.label, 1.0),
+  (agg1, agg2) => agg1.add(agg2))
+  val impurityCalculator = impurityAggregator.getCalculator
+  return new 
LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict,
+impurityCalculator.calculate(), impurityCalculator)
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+  = 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures))
+val labels = input.map(_.datum.label)
+
+// Train classifier if numClasses is between 1 and 32, otherwise fit a 
regression model
+// on the dataset
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is" +
+"unsupported; currently, only regression is supported")
+} else {
+  // TODO(smurching): Pass an array of instanceWeights extracted from 
the input BaggedPoint?
+  // Also, pass seed for feature subsampling
+  trainRegressor(node, colStoreInit, labels, metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor seems the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier?
+   *
+   * @param rootNode Node to fit on the passed-in dataset
+   * @param colStoreInit Array of columns of training data
+   * @param metadata Metadata object
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return
+   */
+  def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by feature values.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector.fromOriginal(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.size
+}
+
+// Create an impurityAggregator object containing info 

[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853423
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTreeUtils.scala 
---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.classification.DecisionTreeClassificationModel
+import org.apache.spark.ml.regression.DecisionTreeRegressionModel
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
+import org.apache.spark.mllib.tree.impurity._
+import org.apache.spark.mllib.tree.model.{ImpurityStats, Predict}
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * DecisionTree which partitions data by feature.
+ *
+ * Algorithm:
+ *  - Repartition data, grouping by feature.
+ *  - Prep data (sort continuous features).
+ *  - On each partition, initialize instance--node map with each instance 
at root node.
+ *  - Iterate, training 1 new level of the tree at a time:
+ * - On each partition, for each feature on the partition, select the 
best split for each node.
+ * - Aggregate best split for each node.
+ * - Aggregate bit vector (1 bit/instance) indicating whether each 
instance splits
+ *   left or right.
+ * - Broadcast bit vector.  On each partition, update instance--node 
map.
+ *
+ * TODO: Update to use a sparse column store.
+ */
+private[ml] object LocalDecisionTreeUtils extends Logging {
+
+  /**
+   * Convert a dataset of [[Vector]] from row storage to column storage.
+   * This can take any [[Vector]] type but stores data as [[DenseVector]].
+   *
+   * This maintains sparsity in the data.
+   *
+   * This maintains matrix structure.  I.e., each partition of the output 
RDD holds adjacent
+   * columns.  The number of partitions will be min(input RDD's number of 
partitions, numColumns).
+   *
+   * @param rowStore  An array of input data rows, each represented as an
+   *  int array of binned feature values
+   * @return Transpose of rowStore with
+   *
+   * TODO: Add implementation for sparse data.
+   *   For sparse data, distribute more evenly based on number of 
non-zeros.
+   *   (First collect stats to decide how to partition.)
+   */
+  private[impl] def rowToColumnStoreDense(rowStore: Array[Array[Int]]): 
Array[Array[Int]] = {
+// Compute the number of rows in the data
+val numRows = {
+  val longNumRows: Long = rowStore.size
+  require(longNumRows < Int.MaxValue, s"rowToColumnStore given RDD 
with $longNumRows rows," +
+s" but can handle at most ${Int.MaxValue} rows")
+  longNumRows.toInt
+}
+
+// Return an empty array for a dataset with zero rows or columns, 
otherwise
+// return the transpose of the rowStore matrix
+if (numRows == 0 || rowStore(0).size == 0) {
+  Array.empty
+} else {
+  val numCols = rowStore(0).size
+  0.until(numCols).map { colIdx =>
+rowStore.map(row => row(colIdx))
+  }.toArray
+}
+  }
+
+  private[impl] def finalizeTree(
+  rootNode: Node,
+  algo: OldAlgo.Algo,
+  numClasses: Int,
+  numFeatures: Int,
+  parentUID: Option[String]): DecisionTreeModel = {
+parentUID match {
+  case Some(uid) =>
+if (algo == OldAlgo.Classification) {
+  new DecisionTreeClassificationModel(uid, rootNode, numFeatures = 
numFeatures,
+numClasses = numClasses)
+} else {
+  new DecisionTreeRegressionModel(uid, rootNode, numFeatures = 
numFeatures)
+}
+  case None =>
+if (algo == 

[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853386
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+import org.apache.spark.util.collection.BitSet
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset.
+   * TODO(smurching): Accept a seed for feature subsampling
+   *
+   * @param node LearningNode to split
+   */
+  def fitNode(
+  input: Array[BaggedPoint[TreePoint]],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  val impurityAggregator: ImpurityAggregatorSingle =
+input.aggregate(metadata.createImpurityAggregator())(
+  (agg, lp) => agg.update(lp.datum.label, 1.0),
+  (agg1, agg2) => agg1.add(agg2))
+  val impurityCalculator = impurityAggregator.getCalculator
+  return new 
LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict,
+impurityCalculator.calculate(), impurityCalculator)
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+  = 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures))
+val labels = input.map(_.datum.label)
+
+// Train classifier if numClasses is between 1 and 32, otherwise fit a 
regression model
+// on the dataset
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is" +
+"unsupported; currently, only regression is supported")
+} else {
+  // TODO(smurching): Pass an array of instanceWeights extracted from 
the input BaggedPoint?
+  // Also, pass seed for feature subsampling
+  trainRegressor(node, colStoreInit, labels, metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor seems the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier?
+   *
+   * @param rootNode Node to fit on the passed-in dataset
+   * @param colStoreInit Array of columns of training data
+   * @param metadata Metadata object
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return
+   */
+  def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by feature values.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector.fromOriginal(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.size
+}
+
+// Create an impurityAggregator object containing info 

[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...

2016-08-30 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/14872#discussion_r76853239
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+import org.apache.spark.util.collection.BitSet
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset.
+   * TODO(smurching): Accept a seed for feature subsampling
+   *
+   * @param node LearningNode to split
+   */
+  def fitNode(
+  input: Array[BaggedPoint[TreePoint]],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  val impurityAggregator: ImpurityAggregatorSingle =
+input.aggregate(metadata.createImpurityAggregator())(
+  (agg, lp) => agg.update(lp.datum.label, 1.0),
+  (agg1, agg2) => agg1.add(agg2))
+  val impurityCalculator = impurityAggregator.getCalculator
+  return new 
LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict,
+impurityCalculator.calculate(), impurityCalculator)
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+  = 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures))
+val labels = input.map(_.datum.label)
+
+// Train classifier if numClasses is between 1 and 32, otherwise fit a 
regression model
+// on the dataset
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is" +
+"unsupported; currently, only regression is supported")
+} else {
+  // TODO(smurching): Pass an array of instanceWeights extracted from 
the input BaggedPoint?
+  // Also, pass seed for feature subsampling
+  trainRegressor(node, colStoreInit, labels, metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor seems the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier?
+   *
+   * @param rootNode Node to fit on the passed-in dataset
+   * @param colStoreInit Array of columns of training data
+   * @param metadata Metadata object
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return
+   */
+  def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by feature values.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector.fromOriginal(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.size
+}
+
+// Create an impurityAggregator object containing info