[GitHub] spark pull request: [SPARK-3790][MLlib] CosineSimilarity Example
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2622#issuecomment-57896544 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21287/consoleFull) for PR 2622 at commit [`eca3dfd`](https://github.com/apache/spark/commit/eca3dfd62c1ce3643ef03b44f79c3e840b27a390). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3790][MLlib] CosineSimilarity Example
Github user rezazadeh commented on the pull request: https://github.com/apache/spark/pull/2622#issuecomment-57896484 Parameters are now configurable. Added approximation error reporting. Added JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57896055 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21286/consoleFull) for PR 2647 at commit [`5fc1259`](https://github.com/apache/spark/commit/5fc12597afe5964c7b9f688fd2919426b928b3ec). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57896058 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21286/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57894903 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21286/consoleFull) for PR 2647 at commit [`5fc1259`](https://github.com/apache/spark/commit/5fc12597afe5964c7b9f688fd2919426b928b3ec). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2647#discussion_r18427632 --- Diff: project/SparkBuild.scala --- @@ -99,6 +99,30 @@ object SparkBuild extends PomBuild { v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq } + // NOTE: If you change the default version for each profile, + // also corresponding section in pom.xml should be cnahged. --- End diff -- Thanks, I've fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2379#issuecomment-57894238 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21285/Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2379#issuecomment-57894236 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21285/consoleFull) for PR 2379 at commit [`80b0b12`](https://github.com/apache/spark/commit/80b0b12abd15620890523af2d455fef3902ad545). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...
Github user derrickburns commented on the pull request: https://github.com/apache/spark/pull/2634#issuecomment-57894117 I ran the style tests. The pass. Is there something else in the style guide that is not captured in the tests ? I have expended much effort to avoid serializing unnecessary objects. I'm still perplexed why so much data is being captured in the closure that the test fails. Anyway, what are the next steps? Omit the test and Approve the PR? Ask someone to help fix the code to avoid the unit test failure ? Thx ! Sent from my iPhone > On Oct 3, 2014, at 12:17 PM, Xiangrui Meng wrote: > > @derrickburns The *ClusterSuite was created to prevent referencing unnecessary objects into the task closure. You can try to remove Serializable from algorithms. While the models are serializable, the algorithm instances should stay on the driver node. If you want to use a member method in a task closure, either make it static or define it as a local method. If you want to use a member variable, assign it to a val first. > > This is something we can try. Avoiding serializing unnecessary objects is a good practice, but I'm not sure whether it is worth the effort. > > Btw, could you update your PR following the https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide ? Thanks! > > â > Reply to this email directly or view it on GitHub. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2098] All Spark processes should suppor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2379#issuecomment-57893391 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21285/consoleFull) for PR 2379 at commit [`80b0b12`](https://github.com/apache/spark/commit/80b0b12abd15620890523af2d455fef3902ad545). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2651#issuecomment-57893128 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21284/consoleFull) for PR 2651 at commit [`c4f5778`](https://github.com/apache/spark/commit/c4f57780ac03065ee8cecb71c3a950a68277f82f). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2651#issuecomment-57893129 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21284/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3762] clear reference of SparkEnv after...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2624#issuecomment-57892709 @JoshRosen the simple fix is to delete the threadlocal variable completely. Then any access to the threadlocal variable from any thread (even threadpool in Py4J) is going to be reset. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2651#issuecomment-57891695 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21284/consoleFull) for PR 2651 at commit [`c4f5778`](https://github.com/apache/spark/commit/c4f57780ac03065ee8cecb71c3a950a68277f82f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2651#issuecomment-57891571 /cc @davies @cocoatomo @robbles for reviews / feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2651#issuecomment-57891560 /cc @davies @cocotomo @robbles for reviews / feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3772] Allow `ipython` to be used by Pys...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/2651 [SPARK-3772] Allow `ipython` to be used by Pyspark workers; IPython fixes: This pull request addresses a few issues related to PySpark's IPython support: - Fix the remaining uses of the '-u' flag, which IPython doesn't support (see SPARK-3772). - Change PYSPARK_PYTHON_OPTS to PYSPARK_DRIVER_PYTHON_OPTS, so that the old name is reserved in case we ever want to allow the worker Python options to be customized (this variable was introduced in #2554 and hasn't landed in a release yet, so this doesn't break any compatibility). - Retain the old semantics for IPYTHON=1 and IPYTHON_OPTS (to avoid breaking existing example programs). There are more details in a large block comment in `bin/pyspark`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-3772 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2651 commit c4f57780ac03065ee8cecb71c3a950a68277f82f Author: Josh Rosen Date: 2014-10-03T22:43:16Z [SPARK-3772] Allow ipython to be used by Pyspark workers; IPython fixes: - Fix the remaining uses of the '-u' flag, which IPython doesn't support. - Change PYSPARK_PYTHON_OPTS to PYSPARK_DRIVER_PYTHON_OPTS, so that the old name is reserved in case we ever want to allow the worker Python options to be customized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
GitHub user sarutak reopened a pull request: https://github.com/apache/spark/pull/2647 [SPARK-3787] Assembly jar name is wrong when we build with sbt omitting -Dhadoop.version This PR is another solution for When we build with sbt with profile for hadoop and without property for hadoop version like: sbt/sbt -Phadoop-2.2 assembly jar name is always used default version (1.0.4). When we build with maven with same condition for sbt, default version for each profile is used. For instance, if we build like: mvn -Phadoop-2.2 package jar name is used hadoop2.2.0 as a default version of hadoop-2.2. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sarutak/spark fix-assembly-jarname Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2647.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2647 commit eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7 Author: Kousuke Saruta Date: 2014-10-03T22:13:47Z Fixed wrong jar name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57891486 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21282/Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user sarutak closed the pull request at: https://github.com/apache/spark/pull/2647 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57891483 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21282/consoleFull) for PR 2650 at commit [`0e36be7`](https://github.com/apache/spark/commit/0e36be7f174d932c4acabf73b5ca8b76caf54a61). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57891421 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21283/consoleFull) for PR 2649 at commit [`c938845`](https://github.com/apache/spark/commit/c938845534f2581c8761df507a671c48898db0fa). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)` * ` case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57891422 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21283/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2646#discussion_r18426767 --- Diff: python/pyspark/shuffle.py --- @@ -428,7 +427,7 @@ def _recursive_merged_items(self, start): subdirs = [os.path.join(d, "parts", str(i)) for d in self.localdirs] m = ExternalMerger(self.agg, self.memory_limit, self.serializer, - subdirs, self.scale * self.partitions) + subdirs, self.scale * self.partitions, self.partitions) --- End diff -- Good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2646#discussion_r18426749 --- Diff: python/pyspark/tests.py --- @@ -152,7 +152,7 @@ def test_external_sort(self): self.assertGreater(shuffle.DiskBytesSpilled, last) def test_external_sort_in_rdd(self): -conf = SparkConf().set("spark.python.worker.memory", "1m") +conf = SparkConf().set("spark.python.worker.memory", "10m") --- End diff -- Why did this test originally change the worker memory? Is the goal here to force spilling? Maybe we could add an undocumented "always spill / always externalize" configuration option to force spilling irrespective of memory limits in order to test this code. I suppose that we still might want tests like this, though, to check that the memory usage monitoring is working correctly, although I suppose we could write a separate test that only tests the memory monitoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2646#discussion_r18426755 --- Diff: python/pyspark/tests.py --- @@ -152,7 +152,7 @@ def test_external_sort(self): self.assertGreater(shuffle.DiskBytesSpilled, last) def test_external_sort_in_rdd(self): -conf = SparkConf().set("spark.python.worker.memory", "1m") +conf = SparkConf().set("spark.python.worker.memory", "10m") --- End diff -- I ask because I wonder whether increasing this value will change the behavior of the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2646#issuecomment-57890710 This is a great set of refactorings! Thanks for improving the consistency of the test suite names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2646#discussion_r18426715 --- Diff: python/pyspark/tests.py --- @@ -754,27 +756,19 @@ def test_serialize_nested_array_and_map(self): self.assertEqual("2", row.d) -class TestIO(PySparkTestCase): - -def test_stdout_redirection(self): -import subprocess - -def func(x): -subprocess.check_call('ls', shell=True) -self.sc.parallelize([1]).foreach(func) --- End diff -- Yeah, it looks like this test was added in 57b64d0d1902eb51bf79f595626c2b9f80a9d1e2 (part of the original PySpark PR) but I don't remember what it was for and agree that it's weird as written. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57890533 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21281/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57890530 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21281/consoleFull) for PR 2650 at commit [`772aead`](https://github.com/apache/spark/commit/772aeada36bbc569597632360b086b54e7bb05f3). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Event proration based on event timestamps.
Github user bijaybisht commented on the pull request: https://github.com/apache/spark/pull/2633#issuecomment-57890383 Don't understand why it failed this time. Can test be re-fired? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3677] [BUILD] [YARN] pom.xml and SparkB...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/2520#issuecomment-5790 Hey , I will check this patch very soon. I have an impression that these changes to SparkBuild are not needed. Even if they are needed then something needs fixed in pom reader plugin. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57888369 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21280/consoleFull) for PR 2649 at commit [`9f7b571`](https://github.com/apache/spark/commit/9f7b571aca22fed7ca85dab23e306bff5147cfc7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57888370 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21280/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2356#issuecomment-57888189 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21279/consoleFull) for PR 2356 at commit [`a73fa19`](https://github.com/apache/spark/commit/a73fa19786bca754ecf8567bc83bdce1f90569ee). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)` * ` case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)` * `class Word2VecModel(object):` * `class Word2Vec(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2356#issuecomment-57888194 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21279/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57888024 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21283/consoleFull) for PR 2649 at commit [`c938845`](https://github.com/apache/spark/commit/c938845534f2581c8761df507a671c48898db0fa). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57888034 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21282/consoleFull) for PR 2650 at commit [`0e36be7`](https://github.com/apache/spark/commit/0e36be7f174d932c4acabf73b5ca8b76caf54a61). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57883154 @mengxr I'll be occupied next week but I'll try to respond asap to your feedback the week after --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2650#issuecomment-57882214 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21281/consoleFull) for PR 2650 at commit [`772aead`](https://github.com/apache/spark/commit/772aeada36bbc569597632360b086b54e7bb05f3). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/2650 [SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federati... ...on (1.1 version). HA and federation use namespaces instead of host names, so you can't resolve them since that will fail. So be smarter to avoid doing unnecessary work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-3788-1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2650 commit 772aeada36bbc569597632360b086b54e7bb05f3 Author: Marcelo Vanzin Date: 2014-10-03T23:47:00Z [SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federation (1.1 version). HA and federation use namespaces instead of host names, so you can't resolve them since that will fail. So be smarter to avoid doing unnecessary work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57881327 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21280/consoleFull) for PR 2649 at commit [`9f7b571`](https://github.com/apache/spark/commit/9f7b571aca22fed7ca85dab23e306bff5147cfc7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3486][MLlib][PySpark] PySpark support f...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2356#issuecomment-57881048 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21279/consoleFull) for PR 2356 at commit [`a73fa19`](https://github.com/apache/spark/commit/a73fa19786bca754ecf8567bc83bdce1f90569ee). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2646#issuecomment-57881070 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/264/consoleFull) for PR 2646 at commit [`6a2a4b0`](https://github.com/apache/spark/commit/6a2a4b02fa06881d1397146cc552a13de3c69a9c). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2649#issuecomment-57880944 I tested this on both regular and federated HDFS, verified that the "Upload foo..." message in the logs does not show up in either while it would show up for federated HDFS before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3788] [yarn] Fix compareFs to do the ri...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/2649 [SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federati... ...on. HA and federation use namespaces instead of host names, so you can't resolve them since that will fail. So be smarter to avoid doing unnecessary work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-3788 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2649.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2649 commit 9f7b571aca22fed7ca85dab23e306bff5147cfc7 Author: Marcelo Vanzin Date: 2014-10-03T23:30:25Z [SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federation. HA and federation use namespaces instead of host names, so you can't resolve them since that will fail. So be smarter to avoid doing unnecessary work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57880751 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21278/consoleFull) for PR 2647 at commit [`eebbb7d`](https://github.com/apache/spark/commit/eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)` * ` case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)` * `case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)` * `case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode ` * `case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode ` * `case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57880772 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21278/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2497#issuecomment-57880702 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21277/consoleFull) for PR 2497 at commit [`75cde8c`](https://github.com/apache/spark/commit/75cde8cea0ace51ed3204211453f5d5df01349f9). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2497#issuecomment-57880723 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21277/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57874204 @erikerlandson I didn't check the test code. I will try to find another time to make a pass on the test. The implementation looks good to me except minor inline comments. Could you create a JIRA for `.drop(...)` in sampling and link it to the upstream Scala JIRA? So we will remember to update it later. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user jkleckner commented on a diff in the pull request: https://github.com/apache/spark/pull/2647#discussion_r18423582 --- Diff: project/SparkBuild.scala --- @@ -99,6 +99,30 @@ object SparkBuild extends PomBuild { v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq } + // NOTE: If you change the default version for each profile, + // also corresponding section in pom.xml should be cnahged. --- End diff -- Typo cnahged -> changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423491 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423498 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423493 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423500 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423504 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423499 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423485 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423495 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423492 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423489 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423437 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The --- End diff -- bernoulli -> `Bernoulli` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423464 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423484 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423470 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423457 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423473 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423487 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423468 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423448 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs --- End diff -- use `RandomSampler.epsArgs` directly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423443 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon + * When sampling random floating point values the gap sampling logic requires value > 0. An --- End diff -- What do you mean by `value`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423433 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum --- End diff -- add `.` to the end or insert an empty line. You can check the generated doc by `sbt/sbt unidoc`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423478 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423454 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423461 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423474 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423463 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423479 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423453 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423459 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423475 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423477 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423429 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi --- End diff -- `@DeveloperApi` is not necessary for package private classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423444 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon + * When sampling random floating point values the gap sampling logic requires value > 0. An + * optimal value for this parameter is at or near the minimum positive floating point value + * returned by nextDouble() for the RNG being used. + */ + def epsDefault: Double = 5e-11 --- End diff -- The name `epsDefault` is not very clear to me. It could be a `val`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423440 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon --- End diff -- ditto: add `.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423426 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap -import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} +import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliPartitionSampler, + SamplingUtils} --- End diff -- 2-space indentation may be better (thinking of the case when the package name is really long) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423449 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") --- End diff -- include the values of `lb` and `ub` in the message. remove the extra space between `ub` and `+` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423438 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 --- End diff -- We should be more specific on the name. `gsm` is not a common acronym (for sampling). I would recommend some names like `defaultMaxGapSamplingProb`. (This only applies to Bernoulli sampling but adding `Bernoulli` makes the name too long.) It could be a val: `val defaultMaxGapSamplingProb = 0.4`. (We don't need type info for primitive types.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423427 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -375,7 +376,9 @@ abstract class RDD[T: ClassTag]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed) + new PartitionwiseSampledRDD[T, T](this, --- End diff -- The following style is commonly used in Spark: ~~~ new PartitionwiseSampledRDD[T, T]( this, new BernoulliPartitionSampler[T](x(0), x(1)), true, seed) ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3758] [Windows] Wrong EOL character in ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2612#issuecomment-57873138 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21276/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3758] [Windows] Wrong EOL character in ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2612#issuecomment-57873123 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21276/consoleFull) for PR 2612 at commit [`91fb0fd`](https://github.com/apache/spark/commit/91fb0fdf1a3c4492853c0ecc9ffa9d723d369cd4). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark] RDD take() method: overestimate too mu...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2648#issuecomment-57872445 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark] RDD take() method: overestimate too mu...
GitHub user yingjieMiao opened a pull request: https://github.com/apache/spark/pull/2648 [Spark] RDD take() method: overestimate too much In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%." `(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned` Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`) This could be a performance problem. (unless this is the intended behavior) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yingjieMiao/spark rdd_take Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2648.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2648 commit a2aa36b6838ff71941dab1d4af5c8e5f79fd4b4f Author: yingjieMiao Date: 2014-10-03T22:26:01Z RDD take method: overestimate too much --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3786] [PySpark] speedup tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2646#issuecomment-57871751 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/264/consoleFull) for PR 2646 at commit [`6a2a4b0`](https://github.com/apache/spark/commit/6a2a4b02fa06881d1397146cc552a13de3c69a9c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2647#issuecomment-57870821 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21278/consoleFull) for PR 2647 at commit [`eebbb7d`](https://github.com/apache/spark/commit/eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3787] Assembly jar name is wrong when w...
GitHub user sarutak opened a pull request: https://github.com/apache/spark/pull/2647 [SPARK-3787] Assembly jar name is wrong when we build with sbt omitting -Dhadoop.version This PR is another solution for When we build with sbt with profile for hadoop and without property for hadoop version like: {code} sbt/sbt -Phadoop-2.2 assembly {code} jar name is always used default version (1.0.4). When we build with maven with same condition for sbt, default version for each profile. For instance, if we build like: {code} mvn -Phadoop-2.2 package {code} jar name is used hadoop2.2.0 as a default version of hadoop-2.2. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sarutak/spark fix-assembly-jarname Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2647.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2647 commit eebbb7d03423efefcf1cdfb4bab8cbf7348f08a7 Author: Kousuke Saruta Date: 2014-10-03T22:13:47Z Fixed wrong jar name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2497#issuecomment-57869998 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21277/consoleFull) for PR 2497 at commit [`75cde8c`](https://github.com/apache/spark/commit/75cde8cea0ace51ed3204211453f5d5df01349f9). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2497#discussion_r18421944 --- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala --- @@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging { holder.setClassName(filter) // Get any parameters for each filter val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet - params.foreach { + val params = conf.get(paramName, "").split(',').map(_.trim()).foreach { --- End diff -- Ah, I see what I did there. Scala is weird at times. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2497#discussion_r18421854 --- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala --- @@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging { holder.setClassName(filter) // Get any parameters for each filter val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet - params.foreach { + val params = conf.get(paramName, "").split(',').map(_.trim()).foreach { --- End diff -- Weird, how did you get the master one to work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org