[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user smurching closed the pull request at: https://github.com/apache/spark/pull/14872 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
GitHub user smurching reopened a pull request: https://github.com/apache/spark/pull/14872 [SPARK-3162][MLlib][WIP] Add local tree training for decision tree regressors ## What changes were proposed in this pull request? Based on [Yggdrasil](https://github.com/fabuzaid21/yggdrasil), added local training of decision tree regressors. Some classes/objects largely correspond to Yggdrasil classes/objects. Specifically: * class LocalDecisionTreeRegressor --> class YggdrasilRegressor * object LocalDecisionTree --> object YggdrasilRegression * object LocalDecisionTreeUtils --> object Yggdrasil ## How was this patch tested? Added unit tests in (ml/tree/impl/LocalTreeTrainingSuite.scala) verifying that local & distributed training of a decision tree regressor produces the same tree. You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurching/spark local-trees-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14872 commit acf5b3e29a346a0cb86f621269855a6a98a9a74e Author: Siddharth MurchingDate: 2016-08-29T23:51:33Z Add local tree training for decision tree regressors commit aa4fcc8d401385f38fe0cdfdb9fe39062c3a9f96 Author: Siddharth Murching Date: 2016-08-30T01:19:07Z Fix setting of impurity values for leaf nodes to match values produced by distributed Random Forest algorithm commit f273fc6a4b5048ae577d03676def354dce5c87a7 Author: Siddharth Murching Date: 2016-08-31T07:01:26Z WIP refactoring single-machine tree code commit 5e61e3b29c236d27e0d655d15a48f2fe3e13d26a Author: Siddharth Murching Date: 2016-09-01T01:22:48Z Remove unused imports, remove array of single-node impurity aggregators commit d2060fc460a97228a36bf81956cf8dd24c83106e Author: Siddharth Murching Date: 2016-09-01T23:11:17Z WIP commit 634a3223374608d68018daac5500a429034bbc20 Author: Siddharth Murching Date: 2016-09-02T00:21:10Z More work, tests still pass commit eb7fde00e0db5aa5d04951f8f4a9cd62204f1609 Author: Siddharth Murching Date: 2016-09-02T17:02:06Z WIP: Added tests for classes upon which local tree training is dependent. Some integration tests fail commit b748f05e3eaa7d58b1ad86d269e0dda5f35ee885 Author: Siddharth Murching Date: 2016-09-02T17:37:31Z WIP debugging commit 297052242727e6693ccbacf89f44b3ff6db584f7 Author: Siddharth Murching Date: 2016-09-02T21:34:13Z Consolidate checking for valid splits commit 8d443ce38f958e7b83b502e614e01c824cb63c4b Author: Siddharth Murching Date: 2016-09-02T21:52:47Z Delete empty test suite commit ee56ffe98756ed78cefbc3f782a471f04e80b256 Author: Siddharth Murching Date: 2016-09-02T22:49:38Z Fix some style errors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user smurching closed the pull request at: https://github.com/apache/spark/pull/14872 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853720 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.util.collection.BitSet + +/** + * Intermediate data stored during learning. + * TODO(smurching): Rename; maybe TrainingInfo? + * + * Node indexing for nodeOffsets, activeNodes: + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The periphery is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to node i are in the range + * [nodeOffsets(i), nodeOffsets(i+1)). + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class PartitionInfo( +columns: Array[FeatureVector], +nodeOffsets: Array[Int], +activeNodes: BitSet, +fullImpurityAggs: Array[ImpurityAggregatorSingle]) extends Serializable { + + // pre-allocated temporary buffers that we use to sort + // instances in left and right children during update + val tempVals: Array[Int] = new Array[Int](columns(0).values.length) + val tempIndices: Array[Int] = new Array[Int](columns(0).values.length) + + /** For debugging */ + override def toString: String = { +"PartitionInfo(" + + " columns: {\n" + + columns.mkString(",\n") + + " },\n" + + s" nodeOffsets: ${nodeOffsets.mkString(", ")},\n" + + s" activeNodes: ${activeNodes.iterator.mkString(", ")},\n" + + ")\n" + } + + /** + * Update columns and nodeOffsets for the next level of the tree. + * + * Update columns: + * For each column, + * For each (previously) active node, + * Sort corresponding range of instances based on bit vector. + * Update nodeOffsets, activeNodes: + * Split offsets for nodes which split (which can be identified using the bit vector). + * + * @param instanceBitVector Bit vector encoding splits for the next level of the tree. + *These must follow a 2-level ordering, where the first level is by node + *and the second level is by row index. + *bitVector(i) = false iff instance i goes to the left child. + *For instances at inactive (leaf) nodes, the value can be arbitrary. + * @return Updated partition info + */ + def update( + instanceBitVector: BitSet, + newNumNodeOffsets: Int, + labels: Array[Byte], + metadata: DecisionTreeMetadata): PartitionInfo = { +// Create a 2-level representation of the new nodeOffsets (to be flattened). +// These 2 levels correspond to original nodes and their children (if split). +val newNodeOffsets = nodeOffsets.map(Array(_)) +val newFullImpurityAggs = fullImpurityAggs.map(Array(_)) + +val newColumns = columns.zipWithIndex.map { case (col, index) => + index match { +case 0 => first(col, instanceBitVector, metadata, + labels, newNodeOffsets, newFullImpurityAggs) +case _ => rest(col, instanceBitVector, newNodeOffsets) + } +
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853460 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.util.collection.BitSet + +/** + * Intermediate data stored during learning. + * TODO(smurching): Rename; maybe TrainingInfo? + * + * Node indexing for nodeOffsets, activeNodes: + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The periphery is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to node i are in the range + * [nodeOffsets(i), nodeOffsets(i+1)). + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class PartitionInfo( +columns: Array[FeatureVector], +nodeOffsets: Array[Int], +activeNodes: BitSet, +fullImpurityAggs: Array[ImpurityAggregatorSingle]) extends Serializable { --- End diff -- Don't store this. Just compute it when first choosing a split for a node, and reuse those stats as the fullImpurityAggs for all features for that node. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853452 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/PartitionInfo.scala --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.util.collection.BitSet + +/** + * Intermediate data stored during learning. + * TODO(smurching): Rename; maybe TrainingInfo? + * + * Node indexing for nodeOffsets, activeNodes: + * Nodes are indexed left-to-right along the periphery of the tree, with 0-based indices. + * The periphery is the set of leaf nodes (active and inactive). + * + * @param columns Array of columns. + * Each column is sorted first by nodes (left-to-right along the tree periphery); + * all columns share this first level of sorting. + * Within each node's group, each column is sorted based on feature value; + * this second level of sorting differs across columns. + * @param nodeOffsets Offsets into the columns indicating the first level of sorting (by node). + * The rows corresponding to node i are in the range + * [nodeOffsets(i), nodeOffsets(i+1)). + * @param activeNodes Nodes which are active (still being split). + * Inactive nodes are known to be leaves in the final tree. + */ +private[impl] case class PartitionInfo( +columns: Array[FeatureVector], +nodeOffsets: Array[Int], +activeNodes: BitSet, --- End diff -- not needed; use actual nodes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853370 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats +import org.apache.spark.util.collection.BitSet + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset. + * TODO(smurching): Accept a seed for feature subsampling + * + * @param node LearningNode to split + */ + def fitNode( + input: Array[BaggedPoint[TreePoint]], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + val impurityAggregator: ImpurityAggregatorSingle = +input.aggregate(metadata.createImpurityAggregator())( + (agg, lp) => agg.update(lp.datum.label, 1.0), + (agg1, agg2) => agg1.add(agg2)) + val impurityCalculator = impurityAggregator.getCalculator + return new LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict, +impurityCalculator.calculate(), impurityCalculator) +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] + = LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures)) +val labels = input.map(_.datum.label) + +// Train classifier if numClasses is between 1 and 32, otherwise fit a regression model +// on the dataset +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is" + +"unsupported; currently, only regression is supported") +} else { + // TODO(smurching): Pass an array of instanceWeights extracted from the input BaggedPoint? + // Also, pass seed for feature subsampling + trainRegressor(node, colStoreInit, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor seems the same; only difference + * is impurity metric. Use the same logic for fitting a classifier? + * + * @param rootNode Node to fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param metadata Metadata object + * @param splits splits(i) = Array of possible splits for feature i + * @return + */ + def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by feature values. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector.fromOriginal(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.size +} + +// Create an impurityAggregator object containing info
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853423 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTreeUtils.scala --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.internal.Logging +import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.regression.DecisionTreeRegressionModel +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} +import org.apache.spark.mllib.tree.impurity._ +import org.apache.spark.mllib.tree.model.{ImpurityStats, Predict} +import org.apache.spark.util.collection.BitSet + +/** + * DecisionTree which partitions data by feature. + * + * Algorithm: + * - Repartition data, grouping by feature. + * - Prep data (sort continuous features). + * - On each partition, initialize instance--node map with each instance at root node. + * - Iterate, training 1 new level of the tree at a time: + * - On each partition, for each feature on the partition, select the best split for each node. + * - Aggregate best split for each node. + * - Aggregate bit vector (1 bit/instance) indicating whether each instance splits + * left or right. + * - Broadcast bit vector. On each partition, update instance--node map. + * + * TODO: Update to use a sparse column store. + */ +private[ml] object LocalDecisionTreeUtils extends Logging { + + /** + * Convert a dataset of [[Vector]] from row storage to column storage. + * This can take any [[Vector]] type but stores data as [[DenseVector]]. + * + * This maintains sparsity in the data. + * + * This maintains matrix structure. I.e., each partition of the output RDD holds adjacent + * columns. The number of partitions will be min(input RDD's number of partitions, numColumns). + * + * @param rowStore An array of input data rows, each represented as an + * int array of binned feature values + * @return Transpose of rowStore with + * + * TODO: Add implementation for sparse data. + * For sparse data, distribute more evenly based on number of non-zeros. + * (First collect stats to decide how to partition.) + */ + private[impl] def rowToColumnStoreDense(rowStore: Array[Array[Int]]): Array[Array[Int]] = { +// Compute the number of rows in the data +val numRows = { + val longNumRows: Long = rowStore.size + require(longNumRows < Int.MaxValue, s"rowToColumnStore given RDD with $longNumRows rows," + +s" but can handle at most ${Int.MaxValue} rows") + longNumRows.toInt +} + +// Return an empty array for a dataset with zero rows or columns, otherwise +// return the transpose of the rowStore matrix +if (numRows == 0 || rowStore(0).size == 0) { + Array.empty +} else { + val numCols = rowStore(0).size + 0.until(numCols).map { colIdx => +rowStore.map(row => row(colIdx)) + }.toArray +} + } + + private[impl] def finalizeTree( + rootNode: Node, + algo: OldAlgo.Algo, + numClasses: Int, + numFeatures: Int, + parentUID: Option[String]): DecisionTreeModel = { +parentUID match { + case Some(uid) => +if (algo == OldAlgo.Classification) { + new DecisionTreeClassificationModel(uid, rootNode, numFeatures = numFeatures, +numClasses = numClasses) +} else { + new DecisionTreeRegressionModel(uid, rootNode, numFeatures = numFeatures) +} + case None => +if (algo ==
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853386 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats +import org.apache.spark.util.collection.BitSet + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset. + * TODO(smurching): Accept a seed for feature subsampling + * + * @param node LearningNode to split + */ + def fitNode( + input: Array[BaggedPoint[TreePoint]], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + val impurityAggregator: ImpurityAggregatorSingle = +input.aggregate(metadata.createImpurityAggregator())( + (agg, lp) => agg.update(lp.datum.label, 1.0), + (agg1, agg2) => agg1.add(agg2)) + val impurityCalculator = impurityAggregator.getCalculator + return new LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict, +impurityCalculator.calculate(), impurityCalculator) +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] + = LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures)) +val labels = input.map(_.datum.label) + +// Train classifier if numClasses is between 1 and 32, otherwise fit a regression model +// on the dataset +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is" + +"unsupported; currently, only regression is supported") +} else { + // TODO(smurching): Pass an array of instanceWeights extracted from the input BaggedPoint? + // Also, pass seed for feature subsampling + trainRegressor(node, colStoreInit, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor seems the same; only difference + * is impurity metric. Use the same logic for fitting a classifier? + * + * @param rootNode Node to fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param metadata Metadata object + * @param splits splits(i) = Array of possible splits for feature i + * @return + */ + def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by feature values. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector.fromOriginal(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.size +} + +// Create an impurityAggregator object containing info
[GitHub] spark pull request #14872: [SPARK-3162][MLlib][WIP] Add local tree training ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/14872#discussion_r76853239 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats +import org.apache.spark.util.collection.BitSet + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset. + * TODO(smurching): Accept a seed for feature subsampling + * + * @param node LearningNode to split + */ + def fitNode( + input: Array[BaggedPoint[TreePoint]], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + val impurityAggregator: ImpurityAggregatorSingle = +input.aggregate(metadata.createImpurityAggregator())( + (agg, lp) => agg.update(lp.datum.label, 1.0), + (agg1, agg2) => agg1.add(agg2)) + val impurityCalculator = impurityAggregator.getCalculator + return new LeafNode(LocalDecisionTreeUtils.getPredict(impurityCalculator).predict, +impurityCalculator.calculate(), impurityCalculator) +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] + = LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.datum.binnedFeatures)) +val labels = input.map(_.datum.label) + +// Train classifier if numClasses is between 1 and 32, otherwise fit a regression model +// on the dataset +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is" + +"unsupported; currently, only regression is supported") +} else { + // TODO(smurching): Pass an array of instanceWeights extracted from the input BaggedPoint? + // Also, pass seed for feature subsampling + trainRegressor(node, colStoreInit, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor seems the same; only difference + * is impurity metric. Use the same logic for fitting a classifier? + * + * @param rootNode Node to fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param metadata Metadata object + * @param splits splits(i) = Array of possible splits for feature i + * @return + */ + def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by feature values. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector.fromOriginal(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.size +} + +// Create an impurityAggregator object containing info