[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-02 Thread codedeft
Github user codedeft closed the pull request at:

https://github.com/apache/spark/pull/2868


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-01 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61394403
  
@codedeft The merge script didn't close this PR automatically. Could you 
help close it? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-01 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61388530
  
I've merged this into master. Thanks @codedeft adding node id caching, and 
@chouqin @manishamde @jkbradley for reviewing the code!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-01 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61375497
  
It finally finished.

10 Trees, 30 depth limit. mnist8m, 20 executors:

15 hours with node Id cache.
21 hours without node Id cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-01 Thread manishamde
Github user manishamde commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61375098
  
@codeleft I am so sorry. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-11-01 Thread codeleft
Github user codeleft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61365784
  
@manishamde the person you want to respond to is @codedeft. I'm not 
involved with this project. Our names are close, but off by one letter. Sorry 
for the intrusion, I'll see myself out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61359747
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22684/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61359743
  
  [Test build #22684 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22684/consoleFull)
 for   PR 2868 at commit 
[`5f5a156`](https://github.com/apache/spark/commit/5f5a1564af1a8a1cbf6d257941ad969169295fe7).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread manishamde
Github user manishamde commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61359008
  
@codeleft I agree that local training should be a high priority. Just 
curious -- what's the depth of the tree in the failing case? 

I vote for merging this PR since there is no loss in performance for 
shallow trees and gain in performance for deep trees. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61358798
  
@mengxr @jkbradley Can you merge this? This is the only way you can 
effectively train 10 large trees with the mnist8m dataset.

With node Id cache, it took a very long time, but we were able to finish 
training 10 trees on mnist8m in 15 hours with 20 executors. SF with local 
training can finish this in 20 minutes, so local training would be a must in 
the next release.

However, without node Id cache, it looks like it's not even possible. It's 
currently only 60% of the way there and it's already taken 13 hours and dozens 
of fetch failures. I feel that it might eventually just fail because the models 
are just too big to pass around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61358651
  
  [Test build #22684 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22684/consoleFull)
 for   PR 2868 at commit 
[`5f5a156`](https://github.com/apache/spark/commit/5f5a1564af1a8a1cbf6d257941ad969169295fe7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61358267
  
The conflict is caused by the GBoosting check-in. I'm taking a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61340121
  
  [Test build #498 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/498/consoleFull)
 for   PR 2868 at commit 
[`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61336982
  
Yep, apparently so, but someone's working on fixing it ASAP


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61335866
  
Yea, I'm also getting Yarn compilation failure on my machine after doing 
the latest pull. Is this happening everywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61335353
  
  [Test build #498 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/498/consoleFull)
 for   PR 2868 at commit 
[`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61335058
  
Some sort of YARN failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61334801
  
  [Test build #22639 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22639/consoleFull)
 for   PR 2868 at commit 
[`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61334809
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22639/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61333710
  
LGTM  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61327827
  
  [Test build #22639 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22639/consoleFull)
 for   PR 2868 at commit 
[`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61310862
  
@codedeft  I added 2 small comments.  Other than that, it LGTM.  Thanks for 
the PR!
CC: @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19686161
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
--- End diff --

A bit unclear; perhaps update to: "For each TreePoint, an array over trees 
of the node index in each tree.  (Initially, values should all be 1 for root 
node.)"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-31 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19686166
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
 ---
@@ -102,6 +105,15 @@ object DecisionTreeRunner {
 .text(s"fraction of data to hold out for testing.  If given option 
testInput, " +
   s"this option is ignored. default: ${defaultParams.fracTest}")
 .action((x, c) => c.copy(fracTest = x))
+  opt[Boolean]("useNodeIdCache")
+.text(s"whether to use node Id cache during training.")
--- End diff --

Could you please print the default values for the 3 new options?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61212783
  
Your test analysis is pretty convincing!  Keeping the PR sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61190259
  
I've addressed the comments. Please review at your convenience. I'll 
publish some big data results once they are actually done.

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61189986
  
Ok, my performance test on the small mnist is still consistent (100 trees, 
30 depth limit). I think that the big reason for this is that when it's 
actually running in a cluster (as opposed to locally), we'll actually be 
transferring the trees and the actual transfer time can get significant once 
the model gets larger.

This time the times it took were (there was another heavy workload in the 
cluster, so overall slower),
without node id cache : 30 mins 57 seconds
with node id cache and checkpointing every 10 iterations : 19 mins 10 
seconds

This is shown in the time between 'collectAsMap' and 'mapPartitions'. Near 
the end, we see entries like these without node id cache.

213 mapPartitions at DecisionTree.scala:618 +details   2014/10/30 16:19:56  
6 s 
212 collectAsMap at DecisionTree.scala:647 +details2014/10/30 16:19:43  
0.2 s

As you can see, although collectAsMap only took 0.2 seconds starting from 
16:19:43, the mapPartitions doesn't start until 13 seconds later! So although 
the actual mapPartitions process took only 6 seconds, the overall time it took 
was 19 seconds.

Early on, the time inbetween is much smaller:

45  mapPartitions at DecisionTree.scala:618 +details  2014/10/30 15:56:09   
5 s 
44  collectAsMap at DecisionTree.scala:647 +details 2014/10/30 15:56:05 
3 s

In contrast, with node Id cache there's very little time inbetween these 
two steps either early in the process or later in the process, although in 
general mapPartitions seems to take a little more time:

44  mapPartitions at DecisionTree.scala:600 +details 2014/10/30 16:28:36
6 s
43  collectAsMap at DecisionTree.scala:647 +details 2014/10/30 16:28:33 
   3 s

212 mapPartitions at DecisionTree.scala:600 +details 2014/10/30 16:41:49
7 s 
211 collectAsMap at DecisionTree.scala:647 +details 2014/10/30 16:41:46 
   2 s

I guess that the reason we don't see too much improvement with larger 
datasets is that mapPartitions take much longer time, and the additional time 
it takes to transfer models become comparatively smaller in percentage.

I'm still running the 10 tree mnist8m 30 depth test, been running for 5+ 
hours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61188396
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22565/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61188389
  
  [Test build #22565 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22565/consoleFull)
 for   PR 2868 at commit 
[`54656c5`](https://github.com/apache/spark/commit/54656c53cd30f243e9a7ff6ad76a2daede3b7aa0).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61179636
  
  [Test build #22565 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22565/consoleFull)
 for   PR 2868 at commit 
[`54656c5`](https://github.com/apache/spark/commit/54656c53cd30f243e9a7ff6ad76a2daede3b7aa0).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61173555
  
I agree it probably only used 2 executors since there were only 2 
partitions for the data.  (I think reduceByKey uses the same partitioner by 
default.)

I think it'd be good to include now, to be used more in the future with 
other optimizations & for dumping trees to disk, as you point out.

Thanks for running more tests!  Please let me know when it's ready for a 
final pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61170031
  
Hm, I see. I'll try testing again on the small mnist but my previous test 
was on a cluster with 8 executors. However, I realize now that it probably only 
utilized 2 our of 8 executors (seems like reducebykey that's used doesn't 
really use extra executors?).

In addition to being useful for deep trees and local-training, I do think 
that another usefulness of node-Id-cache is if you want to write the model 
directly to disk without keeping them in memory.

So even if we may not see any performance benefit for now, I do think that 
we'll need them later. So I guess that the decision is really up to you on 
whether to include this now or later. It's just that you probably don't want to 
have it hanging for too long. As long as it gets in soon after release, I'm ok.

I'm currently running deep tree test, albeit with only 10 trees on mnist8m. 
I'll see if I can see some benefit here at least.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61167709
  
I ran some local tests but did not see any speedups.  This was trying to 
mimic your earlier test:
* original mnist dataset
* depths 5, 10, 20, and 30
* 1 compute node (figuring 1 node would make node ID caching most helpful)
Running times were virtually the same between the master & the master + 
this PR.  I only tested it with 1 tree and 5 trees, but I would not expect that 
to make a huge difference from 100 trees.

Thinking about when distributed node ID caching might speed things up, I 
could imagine it being most helpful with an imbalanced tree, where there was 
some path which a lot of instances followed.  I.e., the tree gets very deep 
before we can switch to local training.  This sounds a bit unlikely to me.  
However, I would be OK with merging this as protection against such an event, 
especially since I did not see slowdowns from this PR.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61165010
  
I too hope caching will be useful later on.  One last thing I'm trying is 
running locally (on a beefier machine than my laptop).  If it helps in local 
mode, it might be worth keeping, especially since I did not see slow-downs in 
the distributed setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread manishamde
Github user manishamde commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61156969
  
@codedeft @jkbradley I have not followed the discussion very closely 
(apologies!) but at the high level, could we add local training support along 
with this PR possibly after the 1.2 code freeze. I think there is a lot of 
effort put into this and it will be a shame to see it wasted especially since 
it is relevant to local training. I vote for keeping this PR alive and add 
local training feature to it. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61155725
  
Yea, I'm trying to run depth 30 tests, but I got failures (both without and 
with node Id cache) that seem to happen often in our clusters when using 
TorrentBroadcast. Trying to test again with HttpBroadcast. But anyhow, I have a 
hard time imagining people training deep trees without local training. So for 
now, node Id cache seems not very necessary.

I think though that this might be a good addition for adding local training 
later. Eventually once deep trees become very easy to train, I think passing 
them back and forth would not be advisable. So this could be a check-in for 
future preparation. What do you think?

It's hard to compare against Sequoia Forest because SF has been highly 
optimized in data structures, and currently even without local-training runs 
about 3 times faster than this (e.g. it took 18 minutes to train 100 trees with 
depthlimit of 10 on mnist8m without local training whereas DecisionTreeRunner 
took about an hour).

I think it has a lot to do with a lot of small things (e.g., SF doesn't 
need to pass back and forth bin information, doesn't use any map structure to 
prevent auto-boxing and faster lookup, etc.). So I'm not sure if the node Id 
cache had anything to do with it. These are optimizations we can add later on 
MLLib as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-30 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61153266
  
I got similar results on 16 nodes using MNIST8m; basically no change in 
runtime (+/- a few percent at most).  But those tests were for shallow trees.  
I worry that this patch will only help for depths at which we might as well 
switch to local training.  In tests with Sequoia Forests, do you know how much 
distributed node ID caching helps?  I wonder if node ID caching will only be 
helpful for local training.

Running more tests...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-29 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61037009
  
I agree local sub-tree training will be needed to train deep trees.  That 
should probably be the next priority.  I'm running some tests now and will see 
if I see different speedups (also on mnist8m, though I may try some synthetic 
datasets later for variety).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-29 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-61026533
  
I've been doing some larger dataset (8 million rows with 784 features) 
testing on node Id cache and I don't think that node Id cache will do much for 
shallow trees. I'm trying to see where the 'sweet spot' is, but it may have to 
be well beyond depth 10 for node Id cache to be useful.

Anyhow, it's taking an extremely long time to train to begin with for these 
big trees with only around 20 executors. I actually gave up to train upto 30 
depth level because it was taking upward of 8+ hours to train 100 trees. So the 
local sub-tree training is really essential here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19518754
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
--- End diff --

The current implementation should be correct. When we compute `nodeIds` 
(this is where `nodeIds` gets materialized), `prevNodeIds` should be cached. So 
after the computation, we have both RDDs cached. `count()` doesn't help here. 
`take(1)` may only cache the first partition.

One thing to watch is the closure size of those RDDs. If we happen to 
reference lots of stuff, it will increase the storage requirement on the 
driver, who needs to remember how to re-compute each RDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

--

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19518756
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
+
+  // To keep track of the past checkpointed RDDs.
--- End diff --

An alternative solution to checkpointing is to recompute `nodeIds` from 
scratch every few iterations. It is reliable as long as the driver stays alive. 
If we do that, users don't need to set checkpoint dir and we don't need to 
manage checkpoint files. I hope the overhead is not big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional com

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60863634
  
Another thought: This checkpointing logic seems like it will be useful for 
a bunch of algorithms in the future.  It would be nice to abstract it into some 
class which has standard parameters (checkpointInterval, etc.) and a method 
like:
```
def checkpointIfNeeded(dataTransform: RDD -> RDD)
```
which would handle persisting, checkpointing, unpersisting, and removing 
checkpoint files.  It can be a future PR though.  It would be useful for the 
GradientBoosting PR: [https://github.com/apache/spark/pull/2607]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19513979
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
--- End diff --

take(1) is probably a reasonable thing to do. But a hack nonetheless. Lol


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19512071
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
--- End diff --

@mengxr  Can you weigh in on this (if you know of a good fix)?  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510500
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -584,6 +648,13 @@ object DecisionTree extends Serializable with Logging {
 
 timer.stop("chooseSplits")
 
+val nodeIdUpdaters = if (nodeIdCache.nonEmpty) {
+  Array
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510465
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
--- End diff --

That's what I tried first, but then I found out that the RDD doesn't 
actually get persisted when I call 'persist'. It seems that the actual 
persistence won't happen until some action method gets called outside of this 
method. Do you know if there's a way to trigger persistence right there and 
then? Same with checkpointing, because that would make life much easier from 
the management perspective (I personally don't like these implicit 
under-the-hood things very much... ;)).

Additionally, I think that there'll always be some window where you'll have 
two RDDs persisted since for performance, you want to get the next persisted 
one from the previously persisted one. So that's one downside of this approach 
since this will require more memory than direct-tree approach. At least until 
the trees ge

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510480
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510132
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510120
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510115
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19510109
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -613,6 +684,14 @@ object DecisionTree extends Serializable with Logging {
   node.rightNode = Some(Node(Node.rightChildIndex(nodeIndex),
 stats.rightPredict, stats.rightImpurity, rightChildIsLeaf))
 
+  if (nodeIdCache.nonEmpty) {
+val nodeIndexUpdater = NodeIndexUpdater(
+  split = split,
+  nodeIndex = nodeIndex)
+nodeIdUpdaters(treeIndex) =
+  nodeIdUpdaters(treeIndex) ++ Map(nodeIndex -> 
nodeIndexUpdater)
--- End diff --

This was one of those thoughtless choices because I didn't think that this 
will have that much of a performance impact. But I could change this more 
simply to a mutable Map if you'd like. I think it'll be cheaper and maybe 
slightly faster, particularly for this collection addition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60845877
  
@codedeft  Thanks for the updates!  I added some small comments above.  
Feel free to ignore the OpenHashMap suggestion, unless you find a problem in 
your tests.  After these small fixes, I think it will be ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19509769
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
--- End diff --

Can a new method be added to clean up any remaining checkpoint files at the 
end of training?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19509685
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
--- End diff --

Please add ":: DeveloperApi ::" to start of doc here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19509616
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
--- End diff --

Can this be a temp value within `updateNodeIndices()`?  It could hold a 
reference to the current RDD until the next RDD is ready and 
persisted/checkpointed, and then it could be unpersisted.  Currently, it looks 
like 2 RDDs will be persisted at any given time (rather than just during 
`updateNodeIndices()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spar

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19509528
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+Node.leftChildIndex(nodeIndex)
+  } else {
+Node.rightChildIndex(nodeIndex)
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param nodeIdsForInstances The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  var nodeIdsForInstances: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // Keep a reference to a previous node Ids for instances.
+  // Because we will keep on re-persisting updated node Ids,
+  // we want to unpersist the previous RDD.
+  var prevNodeIdsForInstances: RDD[Array[Int]] = null
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
--- End diff --

Can `checkpointQueue` and `rddUpdateCount` be made private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19508975
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param nodeIndex The current node index of a data point that this will 
update.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+nodeIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
--- End diff --

Since you now know `split` available, can you replace 
`bins(featureIndex)(binIndex).highSplit.threshold` with `split.threshold` and 
then eliminate the `bins` argument?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19508647
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
--- End diff --

Organize imports (this one above the org.apache.spark imports)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19508499
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -584,6 +648,13 @@ object DecisionTree extends Serializable with Logging {
 
 timer.stop("chooseSplits")
 
+val nodeIdUpdaters = if (nodeIdCache.nonEmpty) {
+  Array
--- End diff --

fits on one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19508498
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -613,6 +684,14 @@ object DecisionTree extends Serializable with Logging {
   node.rightNode = Some(Node(Node.rightChildIndex(nodeIndex),
 stats.rightPredict, stats.rightImpurity, rightChildIsLeaf))
 
+  if (nodeIdCache.nonEmpty) {
+val nodeIndexUpdater = NodeIndexUpdater(
+  split = split,
+  nodeIndex = nodeIndex)
+nodeIdUpdaters(treeIndex) =
+  nodeIdUpdaters(treeIndex) ++ Map(nodeIndex -> 
nodeIndexUpdater)
--- End diff --

Have you checked out timing to see if using a Map like this causes issues 
with garbage collection?  I'm wondering if using something like 
org.apache.spark.util.collection.OpenHashMap would be more efficient; you could 
construct the map using an OpenHashMap and then cast it to an immutable map.  
I've only tested locally so far, and it does not seem to be an issue.  But we 
can keep it in mind for distributed tests if we ever see that the internal 
timing for findBestSplits is significantly different from chooseSplits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60729383
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22351/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60729378
  
  [Test build #22351 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22351/consoleFull)
 for   PR 2868 at commit 
[`58a7b3e`](https://github.com/apache/spark/commit/58a7b3eaa7a51bfc2da85c7144bd721e5c252acf).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60723953
  
Updated codes that at every iteration, persist new cache values while 
unpersisting old values have been submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60721861
  
  [Test build #22351 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22351/consoleFull)
 for   PR 2868 at commit 
[`58a7b3e`](https://github.com/apache/spark/commit/58a7b3eaa7a51bfc2da85c7144bd721e5c252acf).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60713165
  
Here's one number. But this requires constant re-caching new node Id caches 
and unpersisting old node Id caches that is not reflected in the code yet. I'm 
not sure if frequent persisting of a new RDD from a previously persisted RDD is 
a cheap operation, but at least in this data set, it seems fast. Let me know if 
you guys know more about persistence mechanism.

mnist dataset, 750 columns with 6 rows (only two partitions). 8 
executors. 10-class classification. 100 trees trained, 30 max depth. Gini. With 
the default fraction testing.

Without node-id caching, it took 24 mins 34 seconds.
With node-id caching with persisting the cache every two iteration, it took 
16 minutes 42 seconds.

So we see noticeable benefits, as long as we frequently recache the node Id 
cache.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60712109
  
Currently doing some performance testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60711278
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22332/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60711275
  
  [Test build #22332 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22332/consoleFull)
 for   PR 2868 at commit 
[`e08ef62`](https://github.com/apache/spark/commit/e08ef62b7026b342ba5b4623734a0879533a3aec).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60707307
  
  [Test build #22332 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22332/consoleFull)
 for   PR 2868 at commit 
[`e08ef62`](https://github.com/apache/spark/commit/e08ef62b7026b342ba5b4623734a0879533a3aec).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60634290
  
CC: @manishamde If you have time to take a look!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-23 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19306308
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

I think that one good benefit of Decision Tree over NearestNeighbor, even 
in unpruned state, is the compactness of representation. It's still a lot less 
storage for models of decision trees, compared to having to store entire 
datasets.

But yea, it still suffers like a lot of other non-parametric algorithms 
(e.g. SVM, etc.) whose models grow proportionally to dataset sizes, especially 
if unpruned.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so,

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-23 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19300415
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

I agree it could get deep quickly, and switching indexing systems seems 
good for a future PR.  (W.r.t. training without pruning, I suspect it would be 
better and more efficient to use outright nearest neighbor since completely 
unpruned trees are basically doing nearest neighbor.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
wi

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-22 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19249614
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

I think that this is fine as long as you get relatively balanced trees, 
since the tree would be extremely big by the time you reach level 30.

However, based on my experience, the problem in practice is that you often 
get unbalanced trees. E.g., when I train on 60,000 sample mnist without 
pruning, I often got a tree with close to ~100 level deep, even though the 
number of nodes was only around 5000 or so.

I will do it the way you suggested by simply calling Node.leftChildId, 
Node.rightChildId.

For future, though, I think that a relatively easy way to do thi

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19247743
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
 // Finally, only best Splits for nodes are collected to driver to 
construct decision tree.
 val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
 val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
-val nodeToBestSplits =
+
+val partitionAggregates = if (useNodeIdCache) {
+  input.zip(nodeIdCache.get.cur).mapPartitions { points =>
+// Construct a nodeStatsAggregators array to hold node aggregate 
stats,
+// each node will have a nodeStatsAggregator
+val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
+  val featuresForNode = nodeToFeaturesBc.value.flatMap { 
nodeToFeatures =>
+Some(nodeToFeatures(nodeIndex))
+  }
+  new DTStatsAggregator(metadata, featuresForNode)
+}
+
+// iterator all instances in current partition and update 
aggregate stats
+points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
--- End diff --

Good point; ignore my comment please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19247666
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
+   * @param nodeIdUpdaters A map of node index updaters.
+   *   The key is the indices of nodes that we want to 
update.
+   * @param bins Bin information needed to find child node indices.
+   */
+  def updateNodeIndices(
+  nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
+  bins: Array[Array[Bin]]): Unit = {
+val updatedRDD = data.zip(cur).map {
+  dataPoint => {
+cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
+  treeId => {
+val nodeIdUpdater = 
nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), n

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19247637
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

True, since Node indices are Integers (not Longs), 30+ level trees are a 
problem.  We could definitely switch to Long at some point---or even eliminate 
indices, though that might require extra work in some places.  I tried to keep 
the node indexing logic grouped within Node.scala at least.

My opinion is that the current indexing system is reasonable; if we really 
support 62+ depth trees, then moving to larger integer types seems OK.  It 
would be a bit more storage but not that much relative to node size.  Also, at 
that depth, individual trees would likely need to be distr

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195671
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
 // Finally, only best Splits for nodes are collected to driver to 
construct decision tree.
 val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
 val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
-val nodeToBestSplits =
+
+val partitionAggregates = if (useNodeIdCache) {
+  input.zip(nodeIdCache.get.cur).mapPartitions { points =>
+// Construct a nodeStatsAggregators array to hold node aggregate 
stats,
+// each node will have a nodeStatsAggregator
+val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
+  val featuresForNode = nodeToFeaturesBc.value.flatMap { 
nodeToFeatures =>
+Some(nodeToFeatures(nodeIndex))
+  }
+  new DTStatsAggregator(metadata, featuresForNode)
+}
+
+// iterator all instances in current partition and update 
aggregate stats
+points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
--- End diff --

Well, one requires zip and the other one doesn't,  so fundamentally changes 
the type of rows.

Additionally, I think if we just branch out within mapPartitions, won't it 
unnecessarily serialize some things that are not used in one branch and not the 
other? E.g. it seems that binSeqOp itself becomes object and will be 
serialized, along with closure items.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195610
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
 }
 
 /**
+ * Do the same thing as bingSeqOp, but with nodeIdCache.
+ */
+def binSeqOpWithNodeIdCache(
+agg: Array[DTStatsAggregator],
+dataPoint: (BaggedPoint[TreePoint], Array[Int])): 
Array[DTStatsAggregator] = {
+  treeToNodeToIndexInfo.foreach { case (treeIndex, nodeIndexToInfo) =>
+val baggedPoint = dataPoint._1
+val nodeIdCache = dataPoint._2
+val nodeIndex = nodeIdCache(treeIndex)
+val nodeInfo = nodeIndexToInfo.getOrElse(nodeIndex, null)
+// We are processing this point only if it's in the 
nodeIndexToInfo map.
--- End diff --

Will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195598
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -629,6 +699,10 @@ object DecisionTree extends Serializable with Logging {
   }
 }
 
+if (useNodeIdCache) {
--- End diff --

Yea, maybe it's not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60040201
  
Thanks for all the comments guys. I'll address them and resubmit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195595
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -584,6 +642,9 @@ object DecisionTree extends Serializable with Logging {
 
 timer.stop("chooseSplits")
 
+val nodeIdUpdaters = Array
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195587
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195544
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
+   * @param nodeIdUpdaters A map of node index updaters.
+   *   The key is the indices of nodes that we want to 
update.
+   * @param bins Bin information needed to find child node indices.
+   */
+  def updateNodeIndices(
+  nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
+  bins: Array[Array[Bin]]): Unit = {
+val updatedRDD = data.zip(cur).map {
+  dataPoint => {
+cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
+  treeId => {
+val nodeIdUpdater = 
nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), nu

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195515
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195486
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
+   * @param nodeIdUpdaters A map of node index updaters.
+   *   The key is the indices of nodes that we want to 
update.
+   * @param bins Bin information needed to find child node indices.
+   */
+  def updateNodeIndices(
+  nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
+  bins: Array[Array[Bin]]): Unit = {
+val updatedRDD = data.zip(cur).map {
+  dataPoint => {
+cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
--- End diff --

Will do. I used it because spire was included somehow (maybe one of the 
dependent packages u

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread codedeft
Github user codedeft commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19195461
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

Yes, I noticed that you have a rule for determining the node indices 
through bit shifts. However, I was wondering if this is something that could 
potentially change in the future, and maybe leave that logic outside.

E.g. this seems to be a primary reason that 30+ level trees can't be 
trained at the moment and you might want to use a different logic in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60013991
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-59998408
  
@codedeft  Done with a pass.  It's looking quite good.  My main comments 
are about code duplication and simplification; I like the general approach.  
Let me know when I should make another pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19178539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -629,6 +699,10 @@ object DecisionTree extends Serializable with Logging {
   }
 }
 
+if (useNodeIdCache) {
--- End diff --

Small comment: I wonder if it would be better to check nodeIdCache.nonEmpty 
instead of using another value useNodeIdCache; it seems less error-prone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19178228
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -584,6 +642,9 @@ object DecisionTree extends Serializable with Logging {
 
 timer.stop("chooseSplits")
 
+val nodeIdUpdaters = Array
--- End diff --

Only allocate if useNodeIdCache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19178115
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
 // Finally, only best Splits for nodes are collected to driver to 
construct decision tree.
 val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
 val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
-val nodeToBestSplits =
+
+val partitionAggregates = if (useNodeIdCache) {
+  input.zip(nodeIdCache.get.cur).mapPartitions { points =>
+// Construct a nodeStatsAggregators array to hold node aggregate 
stats,
+// each node will have a nodeStatsAggregator
+val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
+  val featuresForNode = nodeToFeaturesBc.value.flatMap { 
nodeToFeatures =>
+Some(nodeToFeatures(nodeIndex))
+  }
+  new DTStatsAggregator(metadata, featuresForNode)
+}
+
+// iterator all instances in current partition and update 
aggregate stats
+points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
--- End diff --

(from above)  You can just test ```useNodeIdCache``` at this line.  That 
should reduce code duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19178079
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
 // Finally, only best Splits for nodes are collected to driver to 
construct decision tree.
 val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
 val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
-val nodeToBestSplits =
+
+val partitionAggregates = if (useNodeIdCache) {
--- End diff --

Also, could the 2 branches (of the ```if (useNodeIdCache)```) call be 
combined, where you test for ```useNodeIdCache``` only for the 1 line which 
differs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19177914
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
 // Finally, only best Splits for nodes are collected to driver to 
construct decision tree.
 val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
 val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
-val nodeToBestSplits =
+
+val partitionAggregates = if (useNodeIdCache) {
--- End diff --

Could you please state the type of partitionAggregates for code clarity?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19177832
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
 }
 
 /**
+ * Do the same thing as bingSeqOp, but with nodeIdCache.
+ */
+def binSeqOpWithNodeIdCache(
+agg: Array[DTStatsAggregator],
+dataPoint: (BaggedPoint[TreePoint], Array[Int])): 
Array[DTStatsAggregator] = {
+  treeToNodeToIndexInfo.foreach { case (treeIndex, nodeIndexToInfo) =>
+val baggedPoint = dataPoint._1
+val nodeIdCache = dataPoint._2
+val nodeIndex = nodeIdCache(treeIndex)
+val nodeInfo = nodeIndexToInfo.getOrElse(nodeIndex, null)
+// We are processing this point only if it's in the 
nodeIndexToInfo map.
--- End diff --

Could the chunk of code below be removed into another function so that it 
is not duplicated in binSeqOp?  The function could sit outside of 
findBestSplits().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19177555
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
 }
 
 /**
+ * Do the same thing as bingSeqOp, but with nodeIdCache.
--- End diff --

"bingSeqOp" --> "binSeqOp"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19176858
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
+   * @param nodeIdUpdaters A map of node index updaters.
+   *   The key is the indices of nodes that we want to 
update.
+   * @param bins Bin information needed to find child node indices.
+   */
+  def updateNodeIndices(
+  nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
+  bins: Array[Array[Bin]]): Unit = {
+val updatedRDD = data.zip(cur).map {
+  dataPoint => {
+cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
+  treeId => {
+val nodeIdUpdater = 
nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), n

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19176539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
+   * @param nodeIdUpdaters A map of node index updaters.
+   *   The key is the indices of nodes that we want to 
update.
+   * @param bins Bin information needed to find child node indices.
+   */
+  def updateNodeIndices(
+  nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
+  bins: Array[Array[Bin]]): Unit = {
+val updatedRDD = data.zip(cur).map {
+  dataPoint => {
+cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
+  treeId => {
+val nodeIdUpdater = 
nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), n

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19175188
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
--- End diff --

When you rename ```cur``` to ```SOMENAME```, it would be great to rename 
```updatedRDD``` as well to ```updatedSOMENAME```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19174918
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
+ *  the checkpointed files will be stored.
+ * @param checkpointInterval The checkpointing interval
+ *   (how often should the cache be checkpointed.).
+ */
+@DeveloperApi
+private[tree] class NodeIdCache(
+  val data: RDD[BaggedPoint[TreePoint]],
+  var cur: RDD[Array[Int]],
+  val checkpointDir: Option[String],
+  val checkpointInterval: Int) {
+
+  // To keep track of the past checkpointed RDDs.
+  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
+  var rddUpdateCount = 0
+  if (checkpointDir != None) {
+cur.sparkContext.setCheckpointDir(checkpointDir.get)
+  }
+
+  /**
+   * Update the node index values in the cache.
+   * This updates the RDD and its lineage.
+   * TODO: Passing bin information to executors seems unnecessary and 
costly.
--- End diff --

It would be cheaper (and maybe easier) to pass an Array[Map[Int, Split]] 
(replacing NodeIndexUpdater with Split).  You already have that info when you 
construct nodeIdUpdaters in DecisionTree.scala, and you don't have to 
explicitly store the left/right child IDs.  You can compute child IDs from the 
parent ID using Node.leftChildIndex (and Node.rightChildIndex).  Hopefully that 
simplifies the code some.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-21 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/2868#discussion_r19174472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.tree.configuration.FeatureType._
+import spire.implicits._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.mllib.tree.model.{Bin, Split}
+
+/**
+ * :: DeveloperApi ::
+ * This is used by the node id cache to find the child id that a data 
point would belong to.
+ * @param split Split information.
+ * @param leftChildIndex Left child index.
+ * @param rightChildIndex Right child index.
+ */
+@DeveloperApi
+private[tree] case class NodeIndexUpdater(
+split: Split,
+leftChildIndex: Int,
+rightChildIndex: Int) {
+  /**
+   * Determine a child node index based on the feature value and the split.
+   * @param binnedFeatures Binned feature values.
+   * @param bins Bin information to convert the bin indices to approximate 
feature values.
+   * @return Child node index to update to.
+   */
+  def updateNodeIndex(binnedFeatures: Array[Int], bins: 
Array[Array[Bin]]): Int = {
+if (split.featureType == Continuous) {
+  val featureIndex = split.feature
+  val binIndex = binnedFeatures(featureIndex)
+  val featureValueUpperBound = 
bins(featureIndex)(binIndex).highSplit.threshold
+  if (featureValueUpperBound <= split.threshold) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+} else {
+  if 
(split.categories.contains(binnedFeatures(split.feature).toDouble)) {
+leftChildIndex
+  } else {
+rightChildIndex
+  }
+}
+  }
+}
+
+/**
+ * A given TreePoint would belong to a particular node per tree.
+ * This is used to keep track of which node for a particular tree that a 
TreePoint belongs to.
+ * A separate RDD of Array[Int] needs to be maintained and updated at each 
iteration.
+ * @param data The RDD of training rows.
+ * @param cur The initial values in the cache
+ *(should be an Array of all 1's (meaning the root nodes)).
+ * @param checkpointDir The checkpoint directory where
--- End diff --

Currently, this skips checkpointing if checkpointDir == None.  However, a 
user could set the SparkContext checkpointDir before calling DecisionTree.  Can 
the behavior be changed as follows:
* If a checkpointDir is given here, then it should overwrite any preset 
checkpointDir in SparkContext.
* If no checkpointDir is given, then the code should check the SparkContext 
(via cur.sparkContext.getCheckpointDir) to see if one has already been set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >