[GitHub] spark pull request: [SPARK-3790][MLlib] CosineSimilarity Example

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2622#issuecomment-57896544
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21287/consoleFull)
 for   PR 2622 at commit 
[`eca3dfd`](https://github.com/apache/spark/commit/eca3dfd62c1ce3643ef03b44f79c3e840b27a390).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3790][MLlib] CosineSimilarity Example

2014-10-03 Thread rezazadeh
Github user rezazadeh commented on the pull request:

https://github.com/apache/spark/pull/2622#issuecomment-57896484
  
Parameters are now configurable.
Added approximation error reporting.
Added JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57896055
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21286/consoleFull)
 for   PR 2647 at commit 
[`5fc1259`](https://github.com/apache/spark/commit/5fc12597afe5964c7b9f688fd2919426b928b3ec).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57896058
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21286/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57894903
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21286/consoleFull)
 for   PR 2647 at commit 
[`5fc1259`](https://github.com/apache/spark/commit/5fc12597afe5964c7b9f688fd2919426b928b3ec).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2647#discussion_r18427632
  
--- Diff: project/SparkBuild.scala ---
@@ -99,6 +99,30 @@ object SparkBuild extends PomBuild {
   v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", 
"")).toSeq
   }
 
+  // NOTE: If you change the default version for each profile,
+  // also corresponding section in pom.xml should be cnahged.
--- End diff --

Thanks, I've fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2379#issuecomment-57894238
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21285/Test 
FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2379#issuecomment-57894236
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21285/consoleFull)
 for   PR 2379 at commit 
[`80b0b12`](https://github.com/apache/spark/commit/80b0b12abd15620890523af2d455fef3902ad545).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

2014-10-03 Thread derrickburns
Github user derrickburns commented on the pull request:

https://github.com/apache/spark/pull/2634#issuecomment-57894117
  
I ran the style tests. The pass. Is there something else in the style guide 
that is not captured in the tests ?

I have expended much effort to avoid serializing unnecessary objects. I'm 
still perplexed why so much data is being captured in the closure that the test 
fails. 

Anyway, what are the next steps? Omit the test and Approve the PR? Ask 
someone to help fix the code to avoid the unit test failure ?

Thx !

Sent from my iPhone

> On Oct 3, 2014, at 12:17 PM, Xiangrui Meng  
wrote:
> 
> @derrickburns The *ClusterSuite was created to prevent referencing 
unnecessary objects into the task closure. You can try to remove Serializable 
from algorithms. While the models are serializable, the algorithm instances 
should stay on the driver node. If you want to use a member method in a task 
closure, either make it static or define it as a local method. If you want to 
use a member variable, assign it to a val first.
> 
> This is something we can try. Avoiding serializing unnecessary objects is 
a good practice, but I'm not sure whether it is worth the effort.
> 
> Btw, could you update your PR following the 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide ? 
Thanks!
> 
> —
> Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2379#issuecomment-57893391
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21285/consoleFull)
 for   PR 2379 at commit 
[`80b0b12`](https://github.com/apache/spark/commit/80b0b12abd15620890523af2d455fef3902ad545).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2651#issuecomment-57893128
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21284/consoleFull)
 for   PR 2651 at commit 
[`c4f5778`](https://github.com/apache/spark/commit/c4f57780ac03065ee8cecb71c3a950a68277f82f).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2651#issuecomment-57893129
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21284/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3762] clear reference of SparkEnv after...

2014-10-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2624#issuecomment-57892709
  
@JoshRosen the simple fix is to delete the threadlocal variable completely. 
Then any access to the threadlocal variable from any thread (even threadpool in 
Py4J) is going to be reset. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2651#issuecomment-57891695
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21284/consoleFull)
 for   PR 2651 at commit 
[`c4f5778`](https://github.com/apache/spark/commit/c4f57780ac03065ee8cecb71c3a950a68277f82f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2651#issuecomment-57891571
  
/cc @davies @cocoatomo @robbles for reviews / feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2651#issuecomment-57891560
  
/cc @davies @cocotomo @robbles for reviews / feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...

2014-10-03 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/2651

[SPARK-3772] Allow `ipython` to be used by Pyspark workers; IPython fixes:

This pull request addresses a few issues related to PySpark's IPython 
support:

- Fix the remaining uses of the '-u' flag, which IPython doesn't support 
(see SPARK-3772).
- Change PYSPARK_PYTHON_OPTS to PYSPARK_DRIVER_PYTHON_OPTS, so that the old 
name is reserved in case we ever want to allow the worker Python options to be 
customized (this variable was introduced in #2554 and hasn't landed in a 
release yet, so this doesn't break any compatibility).
- Retain the old semantics for IPYTHON=1 and IPYTHON_OPTS (to avoid 
breaking existing example programs).

There are more details in a large block comment in `bin/pyspark`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-3772

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2651


commit c4f57780ac03065ee8cecb71c3a950a68277f82f
Author: Josh Rosen 
Date:   2014-10-03T22:43:16Z

[SPARK-3772] Allow ipython to be used by Pyspark workers; IPython fixes:

- Fix the remaining uses of the '-u' flag, which IPython doesn't support.
- Change PYSPARK_PYTHON_OPTS to PYSPARK_DRIVER_PYTHON_OPTS, so that the old
  name is reserved in case we ever want to allow the worker Python options
  to be customized.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread sarutak
GitHub user sarutak reopened a pull request:

https://github.com/apache/spark/pull/2647

[SPARK-3787] Assembly jar name is wrong when we build with sbt omitting 
-Dhadoop.version

This PR is another solution for When we build with sbt with profile for 
hadoop and without property for hadoop version like:

sbt/sbt -Phadoop-2.2 assembly

jar name is always used default version (1.0.4).

When we build with maven with same condition for sbt, default version for 
each profile is used.
For instance, if we  build like:

mvn -Phadoop-2.2 package

jar name is used hadoop2.2.0 as a default version of hadoop-2.2.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sarutak/spark fix-assembly-jarname

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2647


commit eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7
Author: Kousuke Saruta 
Date:   2014-10-03T22:13:47Z

Fixed wrong jar name




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57891486
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21282/Test 
FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread sarutak
Github user sarutak closed the pull request at:

https://github.com/apache/spark/pull/2647


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57891483
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21282/consoleFull)
 for   PR 2650 at commit 
[`0e36be7`](https://github.com/apache/spark/commit/0e36be7f174d932c4acabf73b5ca8b76caf54a61).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57891421
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21283/consoleFull)
 for   PR 2649 at commit 
[`c938845`](https://github.com/apache/spark/commit/c938845534f2581c8761df507a671c48898db0fa).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case e: Exception => logError("Source class " + classPath + " 
cannot be instantiated", e)`
  * `  case class AddWebUIFilter(filterName:String, filterParams: 
Map[String, String], proxyBase :String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57891422
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21283/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2646#discussion_r18426767
  
--- Diff: python/pyspark/shuffle.py ---
@@ -428,7 +427,7 @@ def _recursive_merged_items(self, start):
 subdirs = [os.path.join(d, "parts", str(i))
for d in self.localdirs]
 m = ExternalMerger(self.agg, self.memory_limit, 
self.serializer,
-   subdirs, self.scale * self.partitions)
+   subdirs, self.scale * self.partitions, 
self.partitions)
--- End diff --

Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2646#discussion_r18426749
  
--- Diff: python/pyspark/tests.py ---
@@ -152,7 +152,7 @@ def test_external_sort(self):
 self.assertGreater(shuffle.DiskBytesSpilled, last)
 
 def test_external_sort_in_rdd(self):
-conf = SparkConf().set("spark.python.worker.memory", "1m")
+conf = SparkConf().set("spark.python.worker.memory", "10m")
--- End diff --

Why did this test originally change the worker memory?  Is the goal here to 
force spilling?

Maybe we could add an undocumented "always spill / always externalize" 
configuration option to force spilling irrespective of memory limits in order 
to test this code.  I suppose that we still might want tests like this, though, 
to check that the memory usage monitoring is working correctly, although I 
suppose we could write a separate test that only tests the memory monitoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2646#discussion_r18426755
  
--- Diff: python/pyspark/tests.py ---
@@ -152,7 +152,7 @@ def test_external_sort(self):
 self.assertGreater(shuffle.DiskBytesSpilled, last)
 
 def test_external_sort_in_rdd(self):
-conf = SparkConf().set("spark.python.worker.memory", "1m")
+conf = SparkConf().set("spark.python.worker.memory", "10m")
--- End diff --

I ask because I wonder whether increasing this value will change the 
behavior of the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2646#issuecomment-57890710
  
This is a great set of refactorings!  Thanks for improving the consistency 
of the test suite names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2646#discussion_r18426715
  
--- Diff: python/pyspark/tests.py ---
@@ -754,27 +756,19 @@ def test_serialize_nested_array_and_map(self):
 self.assertEqual("2", row.d)
 
 
-class TestIO(PySparkTestCase):
-
-def test_stdout_redirection(self):
-import subprocess
-
-def func(x):
-subprocess.check_call('ls', shell=True)
-self.sc.parallelize([1]).foreach(func)
--- End diff --

Yeah, it looks like this test was added in 
57b64d0d1902eb51bf79f595626c2b9f80a9d1e2 (part of the original PySpark PR) but 
I don't remember what it was for and agree that it's weird as written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57890533
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21281/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57890530
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21281/consoleFull)
 for   PR 2650 at commit 
[`772aead`](https://github.com/apache/spark/commit/772aeada36bbc569597632360b086b54e7bb05f3).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Event proration based on event timestamps.

2014-10-03 Thread bijaybisht
Github user bijaybisht commented on the pull request:

https://github.com/apache/spark/pull/2633#issuecomment-57890383
  
Don't understand why it failed this time. Can test be re-fired?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] pom.xml and SparkB...

2014-10-03 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/2520#issuecomment-5790
  
Hey , I will check this patch very soon. I have an impression that these 
changes to SparkBuild are not needed. Even if they are needed then something 
needs fixed in pom reader plugin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57888369
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21280/consoleFull)
 for   PR 2649 at commit 
[`9f7b571`](https://github.com/apache/spark/commit/9f7b571aca22fed7ca85dab23e306bff5147cfc7).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57888370
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21280/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2356#issuecomment-57888189
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21279/consoleFull)
 for   PR 2356 at commit 
[`a73fa19`](https://github.com/apache/spark/commit/a73fa19786bca754ecf8567bc83bdce1f90569ee).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case e: Exception => logError("Source class " + classPath + " 
cannot be instantiated", e)`
  * `  case class AddWebUIFilter(filterName:String, filterParams: 
Map[String, String], proxyBase :String)`
  * `class Word2VecModel(object):`
  * `class Word2Vec(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2356#issuecomment-57888194
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21279/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57888024
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21283/consoleFull)
 for   PR 2649 at commit 
[`c938845`](https://github.com/apache/spark/commit/c938845534f2581c8761df507a671c48898db0fa).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57888034
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21282/consoleFull)
 for   PR 2650 at commit 
[`0e36be7`](https://github.com/apache/spark/commit/0e36be7f174d932c4acabf73b5ca8b76caf54a61).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread erikerlandson
Github user erikerlandson commented on the pull request:

https://github.com/apache/spark/pull/2455#issuecomment-57883154
  
@mengxr I'll be occupied next week but I'll try to respond asap to your 
feedback the week after


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2650#issuecomment-57882214
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21281/consoleFull)
 for   PR 2650 at commit 
[`772aead`](https://github.com/apache/spark/commit/772aeada36bbc569597632360b086b54e7bb05f3).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/2650

[SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federati...

...on (1.1 version).

HA and federation use namespaces instead of host names, so you can't
resolve them since that will fail. So be smarter to avoid doing
unnecessary work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-3788-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2650


commit 772aeada36bbc569597632360b086b54e7bb05f3
Author: Marcelo Vanzin 
Date:   2014-10-03T23:47:00Z

[SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federation 
(1.1 version).

HA and federation use namespaces instead of host names, so you can't
resolve them since that will fail. So be smarter to avoid doing
unnecessary work.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57881327
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21280/consoleFull)
 for   PR 2649 at commit 
[`9f7b571`](https://github.com/apache/spark/commit/9f7b571aca22fed7ca85dab23e306bff5147cfc7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2356#issuecomment-57881048
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21279/consoleFull)
 for   PR 2356 at commit 
[`a73fa19`](https://github.com/apache/spark/commit/a73fa19786bca754ecf8567bc83bdce1f90569ee).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2646#issuecomment-57881070
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/264/consoleFull)
 for   PR 2646 at commit 
[`6a2a4b0`](https://github.com/apache/spark/commit/6a2a4b02fa06881d1397146cc552a13de3c69a9c).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/2649#issuecomment-57880944
  
I tested this on both regular and federated HDFS, verified that the "Upload 
foo..." message in the logs does not show up in either while it would show up 
for federated HDFS before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...

2014-10-03 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/2649

[SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federati...

...on.

HA and federation use namespaces instead of host names, so you can't
resolve them since that will fail. So be smarter to avoid doing
unnecessary work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-3788

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2649


commit 9f7b571aca22fed7ca85dab23e306bff5147cfc7
Author: Marcelo Vanzin 
Date:   2014-10-03T23:30:25Z

[SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federation.

HA and federation use namespaces instead of host names, so you can't
resolve them since that will fail. So be smarter to avoid doing
unnecessary work.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57880751
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21278/consoleFull)
 for   PR 2647 at commit 
[`eebbb7d`](https://github.com/apache/spark/commit/eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case e: Exception => logError("Source class " + classPath + " 
cannot be instantiated", e)`
  * `  case class AddWebUIFilter(filterName:String, filterParams: 
Map[String, String], proxyBase :String)`
  * `case class LogicalRDD(output: Seq[Attribute], rdd: 
RDD[Row])(sqlContext: SQLContext)`
  * `case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends 
LeafNode `
  * `case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends 
LeafNode `
  * `case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient 
sqlContext: SQLContext)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57880772
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21278/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2497#issuecomment-57880702
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21277/consoleFull)
 for   PR 2497 at commit 
[`75cde8c`](https://github.com/apache/spark/commit/75cde8cea0ace51ed3204211453f5d5df01349f9).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2497#issuecomment-57880723
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21277/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/2455#issuecomment-57874204
  
@erikerlandson I didn't check the test code. I will try to find another 
time to make a pass on the test. The implementation looks good to me except 
minor inline comments. Could you create a JIRA for `.drop(...)` in sampling and 
link it to the upstream Scala JIRA? So we will remember to update it later. 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread jkleckner
Github user jkleckner commented on a diff in the pull request:

https://github.com/apache/spark/pull/2647#discussion_r18423582
  
--- Diff: project/SparkBuild.scala ---
@@ -99,6 +99,30 @@ object SparkBuild extends PomBuild {
   v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", 
"")).toSeq
   }
 
+  // NOTE: If you change the default version for each profile,
+  // also corresponding section in pom.xml should be cnahged.
--- End diff --

Typo cnahged -> changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423491
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423498
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423493
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423500
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423504
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423499
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423485
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423495
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423492
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423489
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423437
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
+   * For sampling fractions <= this value, the gap sampling optimization 
will be applied.
+   * Above this value, it is assumed that "tradtional" bernoulli sampling 
is faster.  The
--- End diff --

bernoulli -> `Bernoulli`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423464
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423484
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423470
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423457
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423473
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423487
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423468
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423448
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
--- End diff --

use `RandomSampler.epsArgs` directly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423443
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
+   * For sampling fractions <= this value, the gap sampling optimization 
will be applied.
+   * Above this value, it is assumed that "tradtional" bernoulli sampling 
is faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs 
will tend to make
+   * the optimal value higher.  The most reliable way to determine this 
value for a given RNG
+   * is to experiment.  I would expect a value of 0.5 to be close in most 
cases.
+   */
+  def gsmDefault: Double = 0.4
+
+  /**
+   * Default gap sampling epsilon
+   * When sampling random floating point values the gap sampling logic 
requires value > 0.  An
--- End diff --

What do you mean by `value`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423433
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
--- End diff --

add `.` to the end or insert an empty line. You can check the generated doc 
by `sbt/sbt unidoc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423478
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423454
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423461
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423474
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423463
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423479
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423453
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423459
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423475
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423477
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
+  require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0")
+  require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.filter { item =>
-  val x = rng.nextDouble()
-  (x >= lb && x < ub) ^ complement
+if (ub - lb <= 0.0) {
+  if (complement) items else Iterator.empty
+} else {
+  if (complement) {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x < lb) || (x >= ub)
+})
+  } else {
+items.filter(item => {
+  val x = rng.nextDouble()
+  (x >= lb) && (x < ub)
+})
+  }
 }
   }
 
   /**
*  Return a sampler that is the complement of the range specified of 
the current sampler.
*/
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
+  def cloneComplement(): BernoulliPartitionSampler[T] =
+new BernoulliPartitionSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
+}
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
+"Sampling fraction must be on interval [0, 1]")
+
+  private val rng: Random = RandomSampler.rngDefault
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+if (fraction <= 0.0) {
+  Iterator.empty
+} else if (fraction >= 1.0) {
+  items
+} else if (fraction <= RandomSampler.gsmDefault) {
+  new GapSamplingIterator(items, fraction, rng, 
RandomSampler.epsDefault)
+} else {
+  items.filter(_ => (rng.nextDouble() <= fraction))
+}
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
 
-  private[random] var rng = new Poisson(mean, new DRand)
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps = RandomSampler.epsArgs
+  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
+
+  private var curseed: Long = System.nanoTime
+  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
 
   override def setSeed(seed: Long) {
-rng = new Poisson(mean, new DRand(seed.toInt))
+curseed = seed
+rng = new Poisson(fraction, new DRand(seed.toInt))
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-items.flatMap { item =>
-  val count = rng.nextInt()
-  if (count == 0) {
-Iterator.empty
-  } else {
-Iterator.fill(count)(item)
-  }
+if (fraction <= 0.0) {
+  Iterator.

[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423429
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
--- End diff --

`@DeveloperApi` is not necessary for package private classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423444
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
+   * For sampling fractions <= this value, the gap sampling optimization 
will be applied.
+   * Above this value, it is assumed that "tradtional" bernoulli sampling 
is faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs 
will tend to make
+   * the optimal value higher.  The most reliable way to determine this 
value for a given RNG
+   * is to experiment.  I would expect a value of 0.5 to be close in most 
cases.
+   */
+  def gsmDefault: Double = 0.4
+
+  /**
+   * Default gap sampling epsilon
+   * When sampling random floating point values the gap sampling logic 
requires value > 0.  An
+   * optimal value for this parameter is at or near the minimum positive 
floating point value
+   * returned by nextDouble() for the RNG being used.
+   */
+  def epsDefault: Double = 5e-11
--- End diff --

The name `epsDefault` is not very clear to me. It could be a `val`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423440
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
+   * For sampling fractions <= this value, the gap sampling optimization 
will be applied.
+   * Above this value, it is assumed that "tradtional" bernoulli sampling 
is faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs 
will tend to make
+   * the optimal value higher.  The most reliable way to determine this 
value for a given RNG
+   * is to experiment.  I would expect a value of 0.5 to be close in most 
cases.
+   */
+  def gsmDefault: Double = 0.4
+
+  /**
+   * Default gap sampling epsilon
--- End diff --

ditto: add `.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423426
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
 import org.apache.spark.util.collection.OpenHashMap
-import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
SamplingUtils}
+import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
BernoulliPartitionSampler,
+ SamplingUtils}
--- End diff --

2-space indentation may be better (thinking of the case when the package 
name is really long)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423449
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
+class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter */
+  @transient val eps: Double = RandomSampler.epsArgs
+  require(lb <= (ub  + eps), "Lower bound (lb) must be <= upper bound 
(ub)")
--- End diff --

include the values of `lb` and `ub` in the message. remove the extra space 
between `ub` and `+`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423438
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError("clone() is not implemented.")
 }
 
+@DeveloperApi
+private [spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers */
+  def rngDefault: Random = new XORShiftRandom
+
+  /**
+   * Default gap sampling maximum
+   * For sampling fractions <= this value, the gap sampling optimization 
will be applied.
+   * Above this value, it is assumed that "tradtional" bernoulli sampling 
is faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs 
will tend to make
+   * the optimal value higher.  The most reliable way to determine this 
value for a given RNG
+   * is to experiment.  I would expect a value of 0.5 to be close in most 
cases.
+   */
+  def gsmDefault: Double = 0.4
--- End diff --

We should be more specific on the name. `gsm` is not a common acronym (for 
sampling). I would recommend some names like `defaultMaxGapSamplingProb`. (This 
only applies to Bernoulli sampling but adding `Bernoulli` makes the name too 
long.)

It could be a val: `val defaultMaxGapSamplingProb = 0.4`. (We don't need 
type info for primitive types.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...

2014-10-03 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2455#discussion_r18423427
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -375,7 +376,9 @@ abstract class RDD[T: ClassTag](
 val sum = weights.sum
 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
 normalizedCumWeights.sliding(2).map { x =>
-  new PartitionwiseSampledRDD[T, T](this, new 
BernoulliSampler[T](x(0), x(1)), true, seed)
+  new PartitionwiseSampledRDD[T, T](this,
--- End diff --

The following style is commonly used in Spark:
 
~~~
  new PartitionwiseSampledRDD[T, T](
this, new BernoulliPartitionSampler[T](x(0), x(1)), true, seed)
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3758] [Windows] Wrong EOL character in ...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2612#issuecomment-57873138
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21276/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3758] [Windows] Wrong EOL character in ...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2612#issuecomment-57873123
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21276/consoleFull)
 for   PR 2612 at commit 
[`91fb0fd`](https://github.com/apache/spark/commit/91fb0fdf1a3c4492853c0ecc9ffa9d723d369cd4).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case e: Exception => logError("Source class " + classPath + " 
cannot be instantiated", e)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark] RDD take() method: overestimate too mu...

2014-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2648#issuecomment-57872445
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark] RDD take() method: overestimate too mu...

2014-10-03 Thread yingjieMiao
GitHub user yingjieMiao opened a pull request:

https://github.com/apache/spark/pull/2648

[Spark] RDD take() method: overestimate too much

In the comment (Line 1083), it says: "Otherwise, interpolate the number of 
partitions we need to try, but overestimate it by 50%."

`(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total 
partitions needed". In every iteration, we should consider the increment `(1.5 
* num * partsScanned / buf.size).toInt - partsScanned`
Existing implementation 'exponentially' grows `partsScanned ` ( roughly: 
`x_{n+1} >= (1.5 + 1) x_n`)

This could be a performance problem. (unless this is the intended behavior)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yingjieMiao/spark rdd_take

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2648


commit a2aa36b6838ff71941dab1d4af5c8e5f79fd4b4f
Author: yingjieMiao 
Date:   2014-10-03T22:26:01Z

RDD take method: overestimate too much




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2646#issuecomment-57871751
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/264/consoleFull)
 for   PR 2646 at commit 
[`6a2a4b0`](https://github.com/apache/spark/commit/6a2a4b02fa06881d1397146cc552a13de3c69a9c).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2647#issuecomment-57870821
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21278/consoleFull)
 for   PR 2647 at commit 
[`eebbb7d`](https://github.com/apache/spark/commit/eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...

2014-10-03 Thread sarutak
GitHub user sarutak opened a pull request:

https://github.com/apache/spark/pull/2647

[SPARK-3787] Assembly jar name is wrong when we build with sbt omitting 
-Dhadoop.version

This PR is another solution for When we build with sbt with profile for 
hadoop and without property for hadoop version like:

{code}
sbt/sbt -Phadoop-2.2 assembly
{code}

jar name is always used default version (1.0.4).

When we build with maven with same condition for sbt, default version for 
each profile.
For instance, if we  build like:

{code}
mvn -Phadoop-2.2 package
{code}

jar name is used hadoop2.2.0 as a default version of hadoop-2.2.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sarutak/spark fix-assembly-jarname

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2647


commit eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7
Author: Kousuke Saruta 
Date:   2014-10-03T22:13:47Z

Fixed wrong jar name




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...

2014-10-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2497#issuecomment-57869998
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21277/consoleFull)
 for   PR 2497 at commit 
[`75cde8c`](https://github.com/apache/spark/commit/75cde8cea0ace51ed3204211453f5d5df01349f9).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...

2014-10-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2497#discussion_r18421944
  
--- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---
@@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging {
   holder.setClassName(filter)
   // Get any parameters for each filter
   val paramName = "spark." + filter + ".params"
-  val params = conf.get(paramName, 
"").split(',').map(_.trim()).toSet
-  params.foreach {
+  val params = conf.get(paramName, 
"").split(',').map(_.trim()).foreach {
--- End diff --

Ah, I see what I did there. Scala is weird at times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...

2014-10-03 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2497#discussion_r18421854
  
--- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---
@@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging {
   holder.setClassName(filter)
   // Get any parameters for each filter
   val paramName = "spark." + filter + ".params"
-  val params = conf.get(paramName, 
"").split(',').map(_.trim()).toSet
-  params.foreach {
+  val params = conf.get(paramName, 
"").split(',').map(_.trim()).foreach {
--- End diff --

Weird, how did you get the master one to work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >