[GitHub] spark pull request #17770: [SPARK-20392][SQL][WIP] Set barrier to prevent re...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17770#discussion_r113624301 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala --- @@ -72,6 +72,34 @@ object CurrentOrigin { } } +case class Barrier(node: Option[TreeNode[_]] = None) --- End diff -- My original thought is: If we use a barrier node, we need to modify many places where we create a new logical plan and wrap it with the barrier node. I will revamp it with a barrier node. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17781: [SPARK-20476] [SQL] Block users to create a table that u...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17781 **[Test build #76217 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76217/testReport)** for PR 17781 at commit [`9563de4`](https://github.com/apache/spark/commit/9563de4b54233f1f2148dac20178b7b8c4ff1f41). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-g...
Github user gatorsmile closed the pull request at: https://github.com/apache/spark/pull/17776 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17776 Close this to prefer another PR https://github.com/apache/spark/pull/17776 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17781: [SPARK-20476] [SQL] Block users to create a table...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/17781 [SPARK-20476] [SQL] Block users to create a table that use commas in the column names ### What changes were proposed in this pull request? ```SQL hive> create table t1(`a,` string); OK Time taken: 1.399 seconds hive> create table t2(`a,` string, b string); FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: MetaException(message:org.apache.hadoop.hive.serde2.SerDeException org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements while columns.types has 2 elements!) hive> create table t2(`a,` string, b string) stored as parquet; FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.IllegalArgumentException: ParquetHiveSerde initialization failed. Number of column name and column type differs. columnNames = [a, , b], columnTypes = [string, string] ``` It has a bug in Hive metastore. When users do not provide alias name in the SELECT query, we call `toPrettySQL` to generate the alias name. For example, the string `get_json_object(jstring, '$.f1')` will be the alias name for the function call in the statement ```SQL SELECT key, get_json_object(jstring, '$.f1') FROM tempView ``` Above is not an issue for the SELECT query statements. However, for CTAS, we hit the issue due to a bug in Hive metastore. Hive metastore does not like the column names containing commas and returned a confusing error message, like: ``` 17/04/26 23:12:56 ERROR [hive.log(397) -- main]: error in initSerDe: org.apache.hadoop.hive.serde2.SerDeException org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! ``` Thus, this PR is to block users to create a table in Hive metastore when the table table has a column containing commas in the name. ### How was this patch tested? Added a test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark blockIllegalColumnNames Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17781.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17781 commit 9563de4b54233f1f2148dac20178b7b8c4ff1f41 Author: Xiao Li Date: 2017-04-27T06:24:39Z fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17776 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76216/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17776 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17776 **[Test build #76216 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76216/testReport)** for PR 17776 at commit [`bcb8cf7`](https://github.com/apache/spark/commit/bcb8cf74bee92cb04c97f1d64254123df2fb393b). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17736: [SPARK-20399][SQL] Can't use same regex pattern between ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17736 ping @cloud-fan @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17768 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76215/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17768 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17768 **[Test build #76215 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76215/testReport)** for PR 17768 at commit [`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17771: [SPARK-20471]Remove AggregateBenchmark testsuite warning...
Github user heary-cao commented on the issue: https://github.com/apache/spark/pull/17771 @hvanhovell thanks for review it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17780 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17780 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76214/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17780 **[Test build #76214 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76214/testReport)** for PR 17780 at commit [`768738c`](https://github.com/apache/spark/commit/768738c4503938dfb467f8cfd22ffe01bc098c07). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17733: [SPARK-20425][SQL] Support a vertical display mod...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17733 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17649: [SPARK-20380][SQL] Output table comment for DESC ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17649#discussion_r113616846 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala --- @@ -295,7 +295,9 @@ class InMemoryCatalog( assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition +val updatedProperties = tableDefinition.properties.filter(kv => kv._1 != "comment") +val newTableDefinition = tableDefinition.copy(properties = updatedProperties) +catalog(db).tables(tableDefinition.identifier.table).table = newTableDefinition --- End diff -- Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17714: [SPARK-20428][Core]REST interface about 'v1/submissions/...
Github user guoxiaolongzte commented on the issue: https://github.com/apache/spark/pull/17714 @HyukjinKwon Help with code review,thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17733: [SPARK-20425][SQL] Support a vertical display mode for D...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17733 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17768 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17768 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76213/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17768 **[Test build #76213 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76213/testReport)** for PR 17768 at commit [`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17735 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76211/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17735 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17735 **[Test build #76211 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76211/testReport)** for PR 17735 at commit [`db11db0`](https://github.com/apache/spark/commit/db11db050f378768a22520bd95b869c679638430). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17596 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76209/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17596 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17596 **[Test build #76209 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76209/testReport)** for PR 17596 at commit [`0f028b1`](https://github.com/apache/spark/commit/0f028b1f79b2f76ae6c1ea2243b72f211961ad02). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17778: Add array_unique UDF
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/17778 Hi @janewangfb, it looks we need a JIRA, better PR title and PR description. Please check out http://spark.apache.org/contributing.html. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17776 @cloud-fan @sameeragarwal @hvanhovell @ueshin Should we just fix the issue in CTAS? Even if users provide commas in the alias names, we just simply remove it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17776 **[Test build #76216 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76216/testReport)** for PR 17776 at commit [`bcb8cf7`](https://github.com/apache/spark/commit/bcb8cf74bee92cb04c97f1d64254123df2fb393b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17768 **[Test build #76215 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76215/testReport)** for PR 17768 at commit [`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17776 When users do not provide alias name in the SELECT query, we call `toPrettySQL` to generate the alias name. For example, the string `get_json_object(jstring, '$.f1')` will be the alias name for the function call in the statement ```SQL SELECT key, get_json_object(jstring, '$.f1') FROM tempView ``` Above is not an issue for the SELECT query statements. However, for CTAS, we hit the issue due to a bug in Hive metastore. Hive metastore does not like the column names containing commas and returned a confusing error message, like: ``` 17/04/26 23:12:56 ERROR [hive.log(397) -- main]: error in initSerDe: org.apache.hadoop.hive.serde2.SerDeException org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! ``` Thus, this PR is to remove the comma from the alias names so that Spark SQL users can do CTAS for the function call containing commas but without user-given alias names. BTW, also add the description into the PR description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/17768 Current status: I don't know how using ```scala val dirs = Array.empty[Int] dirs.headOption.getOrElse { throw new Exception("") } ``` instead of ```scala val dirs = Array.empty[Int] dirs(0) ``` could make the tests failed (given the observations above). In the last commit, I gave a shot it with if-else as below: ```scala val dirs = Array.empty[Int] if (dirs.nonEmpty) { dirs(0) } else { throw new Exception("") } ``` To make sure logically this should not make other tests flaky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/17768 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/1 LGTM except two minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17777: [SPARK-20482][SQL] Resolving Casts is too strict ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/1#discussion_r113609686 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -165,6 +181,14 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) + // When this cast involves TimeZone, it's only resolved if the timeZoneId is set; + // Otherwise behave like Expression.resolved. + override lazy val resolved: Boolean = +childrenResolved && checkInputDataTypes().isSuccess && +(!needsTimeZone || timeZoneId.isDefined) --- End diff -- Nit: It fits one line. Could you just move it to the same line: ```Scala childrenResolved && checkInputDataTypes().isSuccess && (!needsTimeZone || timeZoneId.isDefined) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17777: [SPARK-20482][SQL] Resolving Casts is too strict ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/1#discussion_r113609438 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -89,6 +89,22 @@ object Cast { case _ => false } + def needsTimeZone(from: DataType, to: DataType): Boolean = (from, to) match { --- End diff -- Nit: private? Could you write comments to explain why we only consider these type pairs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite ...
GitHub user tejasapatil opened a pull request: https://github.com/apache/spark/pull/17780 [SPARK-20487][SQL] `HiveTableScan` node is quite verbose in explained plan ## What changes were proposed in this pull request? Changed `TreeNode.argString` to handle `CatalogTable` separately (otherwise it would call the default `toString` on the `CatalogTable`) ## How was this patch tested? - Expanded scope of existing unit test to ensure that verbose information is not present - Manual testing Before ``` scala> hc.sql(" SELECT * FROM my_table WHERE name = 'foo' ").explain(true) == Parsed Logical Plan == 'Project [*] +- 'Filter ('name = foo) +- 'UnresolvedRelation `my_table` == Analyzed Logical Plan == user_id: bigint, name: string, ds: string Project [user_id#13L, name#14, ds#15] +- Filter (name#14 = foo) +- SubqueryAlias my_table +- CatalogRelation CatalogTable( Database: default Table: my_table Owner: tejasp Created: Fri Apr 14 17:05:50 PDT 2017 Last Access: Wed Dec 31 16:00:00 PST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/tmp/warehouse/my_table Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`ds`] Schema: root -- user_id: long (nullable = true) -- name: string (nullable = true) -- ds: string (nullable = true) ), [user_id#13L, name#14], [ds#15] == Optimized Logical Plan == Filter (isnotnull(name#14) && (name#14 = foo)) +- CatalogRelation CatalogTable( Database: default Table: my_table Owner: tejasp Created: Fri Apr 14 17:05:50 PDT 2017 Last Access: Wed Dec 31 16:00:00 PST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/tmp/warehouse/my_table Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`ds`] Schema: root -- user_id: long (nullable = true) -- name: string (nullable = true) -- ds: string (nullable = true) ), [user_id#13L, name#14], [ds#15] == Physical Plan == *Filter (isnotnull(name#14) && (name#14 = foo)) +- HiveTableScan [user_id#13L, name#14, ds#15], CatalogRelation CatalogTable( Database: default Table: my_table Owner: tejasp Created: Fri Apr 14 17:05:50 PDT 2017 Last Access: Wed Dec 31 16:00:00 PST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/tmp/warehouse/my_table Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`ds`] Schema: root -- user_id: long (nullable = true) -- name: string (nullable = true) -- ds: string (nullable = true) ), [user_id#13L, name#14], [ds#15] ``` After ``` scala> hc.sql(" SELECT * FROM my_table WHERE name = 'foo' ").explain(true) == Parsed Logical Plan == 'Project [*] +- 'Filter ('name = foo) +- 'UnresolvedRelation `my_table` == Analyzed Logical Plan == user_id: bigint, name: string, ds: string Project [user_id#13L, name#14, ds#15] +- Filter (name#14 = foo) +- SubqueryAlias my_table +- CatalogRelation `default`.`my_table`, [user_id#13L, name#14], [ds#15] == Optimized Logical Plan == Filter (isnotnull(name#14) && (name#14 = foo)) +- CatalogRelation `default`.`my_table`, [user_id#13L, name#14], [ds#15] == Physical Plan == *Filter (isnotnull(name#14) && (name#14 = foo)) +- HiveTableScan [user_id#13L, name#14, ds#15], CatalogRelation `default`.`my_table`, [user_id#13L, name#14], [ds#15] ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tejasapatil/spark SPARK-20487_verbose_plan Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17780.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17780 commit 768738c4503938dfb467f8cfd22ffe01bc098c07 Author: Tejas Patil Date: 2017-04-27T03:03:56Z [SPARK-20487][SQL] `HiveTableScan` node is quite verbose in ex
[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17780 **[Test build #76214 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76214/testReport)** for PR 17780 at commit [`768738c`](https://github.com/apache/spark/commit/768738c4503938dfb467f8cfd22ffe01bc098c07). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76212/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17779 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17779 **[Test build #76212 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76212/testReport)** for PR 17779 at commit [`e5c1415`](https://github.com/apache/spark/commit/e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113605788 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => --- End diff -- I might be missing something, but what is meant by ``digitSentences``? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17735 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113605682 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) +}.repartition($(numPartitions)).cache() + +val learningRate = $(stepSize) + +val wordsPerPartition = totalWordCount / $(numPartitions) + +logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: $totalWordCount") + +for {iteration <- 1 to $(maxIter)} { + logInfo(s"Starting iteration: $iteration") + val iterationStartTime = System.nanoTime() + + val syn0bc = sc.broadcast(syn0Global) + val syn1bc = sc.broadcast(syn1Global) + + val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, iter) => +logInfo(s"Iteration: $iteration, Partition: $i_") +logInfo(s"Numerical lib class being used : ${blas.getClass.getName}") +val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ ((-iteration - 1) << 8)) +val contextWordPairs = iter.flatMap(generateContextWordPairs(_, window, random)) + +val groupedBatches = contextWordPairs.grouped(batchSize) + +val negLabels = 1.0f +:
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17735 **[Test build #76208 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76208/testReport)** for PR 17735 at commit [`98eb3dc`](https://github.com/apache/spark/commit/98eb3dcdd15d9c5649bbca7effe4f0077130c352). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17768 **[Test build #76213 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76213/testReport)** for PR 17768 at commit [`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113605031 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) +}.repartition($(numPartitions)).cache() + +val learningRate = $(stepSize) + +val wordsPerPartition = totalWordCount / $(numPartitions) + +logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: $totalWordCount") + +for {iteration <- 1 to $(maxIter)} { + logInfo(s"Starting iteration: $iteration") + val iterationStartTime = System.nanoTime() + + val syn0bc = sc.broadcast(syn0Global) + val syn1bc = sc.broadcast(syn1Global) + + val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, iter) => +logInfo(s"Iteration: $iteration, Partition: $i_") +logInfo(s"Numerical lib class being used : ${blas.getClass.getName}") +val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ ((-iteration - 1) << 8)) +val contextWordPairs = iter.flatMap(generateContextWordPairs(_, window, random)) + +val groupedBatches = contextWordPairs.grouped(batchSize) + +val negLabels = 1.0f +:
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113605000 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) +}.repartition($(numPartitions)).cache() + +val learningRate = $(stepSize) + +val wordsPerPartition = totalWordCount / $(numPartitions) + +logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: $totalWordCount") + +for {iteration <- 1 to $(maxIter)} { + logInfo(s"Starting iteration: $iteration") + val iterationStartTime = System.nanoTime() + + val syn0bc = sc.broadcast(syn0Global) + val syn1bc = sc.broadcast(syn1Global) + + val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, iter) => +logInfo(s"Iteration: $iteration, Partition: $i_") +logInfo(s"Numerical lib class being used : ${blas.getClass.getName}") +val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ ((-iteration - 1) << 8)) +val contextWordPairs = iter.flatMap(generateContextWordPairs(_, window, random)) + +val groupedBatches = contextWordPairs.grouped(batchSize) + +val negLabels = 1.0f +:
[GitHub] spark issue #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Words mode...
Github user Krimit commented on the issue: https://github.com/apache/spark/pull/17673 @shubhamchopra have you run this code in a distributed spark cluster yet? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17776 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user wesm commented on the issue: https://github.com/apache/spark/pull/15821 Note: we are shooting for an Arrow RC in Monday time frame, so with luck we'll have a release cut next week --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17776 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76207/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde would you like to share your thoughts on the new changes when you have time ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17776 **[Test build #76207 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76207/testReport)** for PR 17776 at commit [`24f37f1`](https://github.com/apache/spark/commit/24f37f1df3350b3e9977f617c07febeff16cba6c). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ComplexTypesSuite extends PlanTest` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17779 **[Test build #76212 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76212/testReport)** for PR 17779 at commit [`e5c1415`](https://github.com/apache/spark/commit/e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113603661 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) +}.repartition($(numPartitions)).cache() + +val learningRate = $(stepSize) + +val wordsPerPartition = totalWordCount / $(numPartitions) + +logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: $totalWordCount") + +for {iteration <- 1 to $(maxIter)} { + logInfo(s"Starting iteration: $iteration") + val iterationStartTime = System.nanoTime() + + val syn0bc = sc.broadcast(syn0Global) + val syn1bc = sc.broadcast(syn1Global) + + val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, iter) => +logInfo(s"Iteration: $iteration, Partition: $i_") +logInfo(s"Numerical lib class being used : ${blas.getClass.getName}") +val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ ((-iteration - 1) << 8)) +val contextWordPairs = iter.flatMap(generateContextWordPairs(_, window, random)) + +val groupedBatches = contextWordPairs.grouped(batchSize) + +val negLabels = 1.0f +:
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113603493 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) +}.repartition($(numPartitions)).cache() + +val learningRate = $(stepSize) + +val wordsPerPartition = totalWordCount / $(numPartitions) + +logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: $totalWordCount") + +for {iteration <- 1 to $(maxIter)} { + logInfo(s"Starting iteration: $iteration") + val iterationStartTime = System.nanoTime() + + val syn0bc = sc.broadcast(syn0Global) + val syn1bc = sc.broadcast(syn1Global) + + val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, iter) => +logInfo(s"Iteration: $iteration, Partition: $i_") +logInfo(s"Numerical lib class being used : ${blas.getClass.getName}") --- End diff -- Are these logs really necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastr
[GitHub] spark pull request #17779: [DOCS][MINOR] Add missing since to SparkR repeat_...
GitHub user zero323 opened a pull request: https://github.com/apache/spark/pull/17779 [DOCS][MINOR] Add missing since to SparkR repeat_string note. ## What changes were proposed in this pull request? Replace @note repeat_string 2.3.0 with @note repeat_string since 2.3.0 ## How was this patch tested? `create-docs.sh` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zero323/spark REPEAT-NOTE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17779.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17779 commit e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f Author: zero323 Date: 2017-04-27T02:20:56Z Add missing since to repeat note --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/1 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/1 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76205/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/1 **[Test build #76205 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76205/testReport)** for PR 1 at commit [`27ec604`](https://github.com/apache/spark/commit/27ec604f23ff76e943c395ead144538480ef51ed). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113603142 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) +val unigramTablebc = sc.broadcast(uniTable) + +val window = $(windowSize) + +val digitSentences = input.flatMap{sentence => + val wordIndexes = sentence.flatMap(vocabMapbc.value.get) + wordIndexes.grouped($(maxSentenceLength)).map(_.toArray) --- End diff -- I think you need to broadcast ``maxSentenceLength`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/15821 Updated to work with the latest Arrow to prepare for 0.3 release (tests should fail because that artifact is not yet available). Also improved consistency of ArrowConverters and did some cleanup. From @rxin 's comments: > Move ArrowConverters.scala somewhere else that's not top level, e.g. execution.arrow It is now in the o.a.s.sql.execution.arrow package > Update this to arrow 0.3 Ready to do this, should just need to update the pom again >Use SQLConf rather than a parameter for toPandas. I removed this flag and used the conf "spark.sql.execution.arrow.enable" which defaults to "false" >Handle failure gracefully if arrow is not installed (or somehow package it with Spark?) It would be difficult to package with Spark, I think, because pyarrow also depends on the native Arrow cpp library. I changed it to fail gracefully if pyarrow is not available. The error message is: ``` ImportError: No module named pyarrow note: pyarrow must be installed and available on calling Python processif using spark.sql.execution.arrow.enable=true ``` >How are the memory managed? Who allocates the memory for the arrow records, and who's responsible for releasing them? The Java side of Arrow requires using a BufferAllocator class that manages the allocated memory. An instance of this must be used each time a ArrowRecordBatch is created and then the batch and allocator must be released/closed after they have been processed. This is all handled in the `ArrowConverter` functions. On the Python side, buffers are allocated from the Arrow cpp library and cleaned up when reference counts to the objects are zero. The end user does not have to worry about managing any memory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113602908 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, + s"Vocab size ($vocabSize) cannot be smaller than negative samples($negSamples)") +val seed = $(this.seed) +val initRandom = new XORShiftRandom(seed) + +val vectorSize = $(this.vectorSize) + +val syn0Global = Array.fill(vocabSize * vectorSize)(initRandom.nextFloat - 0.5f) +val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f) + +val sc = input.context + +val vocabMapbc = sc.broadcast(vocabMap) --- End diff -- nit: ``vocabMapBroadcast``, same for ``unigramTable`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17735 **[Test build #76211 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76211/testReport)** for PR 17735 at commit [`db11db0`](https://github.com/apache/spark/commit/db11db050f378768a22520bd95b869c679638430). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113602640 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) --- End diff -- No need to return ``vocabMap.size``, that information is duplicated from ``vocabMap`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113602559 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { +val (vocabSize, totalWordCount, vocabMap, uniTable) = generateVocab(input) +val negSamples = $(negativeSamples) +assert(negSamples < vocabSize, --- End diff -- Is this assertion truly needed? What about just ``negSamples = min($(negSamples) - 1, vocabSize)`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113602341 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return + */ + private def fitCBOW[S <: Iterable[String]](input: RDD[S]): feature.Word2VecModel = { --- End diff -- I think the CBOW logic should definitely live outside of this class. I'm even wondering if it's worthwhile to update the mllib Word2Vec to have access to this as well. @MLnick @srowen thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/1 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76204/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/1 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/1 **[Test build #76204 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76204/testReport)** for PR 1 at commit [`76debfb`](https://github.com/apache/spark/commit/76debfb9bd82c63bc354a71d95ab84db0f609fa3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113602046 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. + * @param input + * @return --- End diff -- nit: fill these out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113601993 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention +// and use binary search to get insertion points. This should replicate the same +// behavior as the table in original implementation. +val weights = vocab.map(x => scala.math.pow(x._2, power)) +val totalWeight = weights.sum + +val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => (x / totalWeight)) + +val unigramTableSize = + math.min(maxUnigramTableSize, unigramTableSizeFactor * normalizedCumWeights.length) +val unigramTable = generateUnigramTable(normalizedCumWeights, unigramTableSize) + +(vocabMap.size, totalWordCount, vocabMap, unigramTable) + } + + /** + * This method implements Word2Vec Continuous Bag Of Words based implementation using + * negative sampling optimization, using BLAS for vectorizing operations where applicable. + * The algorithm is parallelized in the same way as the skip-gram based estimation. --- End diff -- Nit: It's worthwhile to describe how this is parallelized. What happens if the skip-gram implementation changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15821 **[Test build #76210 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76210/testReport)** for PR 15821 at commit [`b6fe733`](https://github.com/apache/spark/commit/b6fe733955d6e153722b1945c09ed663d8ed9be2). * This patch **fails build dependency tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15821 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76210/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15821 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113601799 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) --- End diff -- maybe call this ``wordCounts`` or something like that to make it clearer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15821 **[Test build #76210 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76210/testReport)** for PR 15821 at commit [`b6fe733`](https://github.com/apache/spark/commit/b6fe733955d6e153722b1945c09ed663d8ed9be2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113601656 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => + w -> i +}.toMap + +// We create a cumulative distribution array, unlike the original implemention --- End diff -- In curious, why ``unlike the original implemention``? why not follow the original? (also, typo, should be ``implementation``) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113601513 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} + .collect() + .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2} + +val totalWordCount = vocab.map(_._2).sum + +val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) => --- End diff -- ``vocab.keySet.zipWithIndex`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600926 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ArrowConvertersSuite.scala --- @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import java.io.File +import java.nio.charset.StandardCharsets +import java.sql.{Date, Timestamp} +import java.text.SimpleDateFormat +import java.util.Locale + +import com.google.common.io.Files +import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot} +import org.apache.arrow.vector.file.json.JsonFileReader +import org.apache.arrow.vector.util.Validator +import org.json4s.jackson.JsonMethods._ +import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + + +class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { + import testImplicits._ + + private var tempDataPath: String = _ + + private def collectAsArrow(df: DataFrame, + converter: Option[ArrowConverters] = None): ArrowPayload = { +val cnvtr = converter.getOrElse(new ArrowConverters) +val payloadByteArrays = df.toArrowPayloadBytes().collect() +cnvtr.readPayloadByteArrays(payloadByteArrays) + } + + override def beforeAll(): Unit = { +super.beforeAll() +tempDataPath = Utils.createTempDir(namePrefix = "arrow").getAbsolutePath + } + + test("collect to arrow record batch") { +val indexData = (1 to 6).toDF("i") +val arrowPayload = collectAsArrow(indexData) +assert(arrowPayload.nonEmpty) +val arrowBatches = arrowPayload.toArray +assert(arrowBatches.length == indexData.rdd.getNumPartitions) +val rowCount = arrowBatches.map(batch => batch.getLength).sum +assert(rowCount === indexData.count()) +arrowBatches.foreach(batch => assert(batch.getNodes.size() > 0)) +arrowBatches.foreach(batch => batch.close()) + } + + test("numeric type conversion") { +collectAndValidate(indexData) +collectAndValidate(shortData) +collectAndValidate(intData) +collectAndValidate(longData) +collectAndValidate(floatData) +collectAndValidate(doubleData) + } + + test("mixed numeric type conversion") { +collectAndValidate(mixedNumericData) + } + + test("boolean type conversion") { +collectAndValidate(boolData) + } + + test("string type conversion") { +collectAndValidate(stringData) + } + + test("byte type conversion") { +collectAndValidate(byteData) + } + + ignore("timestamp conversion") { +collectAndValidate(timestampData) + } + + // TODO: Not currently supported in Arrow JSON reader + ignore("date conversion") { +// collectAndValidate(dateTimeData) + } + + // TODO: Not currently supported in Arrow JSON reader + ignore("binary type conversion") { +// collectAndValidate(binaryData) + } + + test("floating-point NaN") { +collectAndValidate(floatNaNData) + } + + test("partitioned DataFrame") { +val converter = new ArrowConverters +val schema = testData2.schema +val arrowPayload = collectAsArrow(testData2, Some(converter)) +val arrowBatches = arrowPayload.toArray +// NOTE: testData2 should have 2 partitions -> 2 arrow batches in payload +assert(arrowBatches.length === 2) +val pl1 = new ArrowStaticPayload(arrowBatches(0)) +val pl2 = new ArrowStaticPayload(arrowBatches(1)) +// Generate JSON files +val a = List[Int](1, 1, 2, 2, 3, 3) +val b = List[Int](1, 2, 1, 2, 1, 2) +val fields
[GitHub] spark pull request #17191: [SPARK-14471][SQL] Aliases in SELECT could be use...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/17191#discussion_r113600909 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -998,6 +998,22 @@ class Analyzer( } /** + * Replace unresolved expressions in grouping keys with resolved ones in SELECT clauses. --- End diff -- ok, I'll update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17191: [SPARK-14471][SQL] Aliases in SELECT could be use...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/17191#discussion_r113600878 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -998,6 +998,22 @@ class Analyzer( } /** + * Replace unresolved expressions in grouping keys with resolved ones in SELECT clauses. + */ + object ResolveAggAliasInGroupBy extends Rule[LogicalPlan] { + +override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case agg @ Aggregate(groups, aggs, child) + if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) && +groups.exists(!_.resolved) => --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17733: [SPARK-20425][SQL] Support a vertical display mode for D...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/17733 @gatorsmile Could you check again? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113600716 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { +val sc = input.context + +val words = input.flatMap(x => x) + +val vocab = words.map(w => (w, 1L)) + .reduceByKey(_ + _) + .filter{case (w, c) => c >= $(minCount)} --- End diff -- You should broadcast ``$(minCount)`` to avoid shipping the entire class to each task. (and make sure to destroy it at the end of the method) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113600548 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) + } + a += 1 +} +table + } + + private def generateVocab[S <: Iterable[String]](input: RDD[S]): + (Int, Long, Map[String, Int], Array[Int]) = { --- End diff -- This could use a named class. It's not immediately clear what each of these are --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala --- @@ -0,0 +1,432 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.channels.{Channels, SeekableByteChannel} + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BaseAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter} +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * ArrowReader requires a seekable byte channel. + * TODO: This is available in arrow-vector now with ARROW-615, to be included in 0.2.1 release + */ +private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: Array[Byte]) + extends SeekableByteChannel { + var _position: Long = 0L + + override def isOpen: Boolean = { +byteArray != null + } + + override def close(): Unit = { +byteArray = null + } + + override def read(dst: ByteBuffer): Int = { +val remainingBuf = byteArray.length - _position +val length = Math.min(dst.remaining(), remainingBuf).toInt +dst.put(byteArray, _position.toInt, length) +_position += length +length + } + + override def position(): Long = _position + + override def position(newPosition: Long): SeekableByteChannel = { +_position = newPosition.toLong +this + } + + override def size: Long = { +byteArray.length.toLong + } + + override def write(src: ByteBuffer): Int = { +throw new UnsupportedOperationException("Read Only") + } + + override def truncate(size: Long): SeekableByteChannel = { +throw new UnsupportedOperationException("Read Only") + } +} + +/** + * Intermediate data structure returned from Arrow conversions + */ +private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch] + +/** + * Build a payload from existing ArrowRecordBatches + */ +private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends ArrowPayload { + private val iter = batches.iterator + override def next(): ArrowRecordBatch = iter.next() + override def hasNext: Boolean = iter.hasNext +} + +/** + * Class that wraps an Arrow RootAllocator used in conversion + */ +private[sql] class ArrowConverters { + private val _allocator = new RootAllocator(Long.MaxValue) + + private[sql] def allocator: RootAllocator = _allocator + + /** + * Iterate over the rows and convert to an ArrowPayload, using RootAllocator from this class + */ + def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: StructType): ArrowPayload = { +val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, schema, _allocator) +new ArrowStaticPayload(batch) + } + + /** + * Read an Array of Arrow Record batches as byte Arrays into an ArrowPayload, using + * RootAllocator from this class + */ + def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): ArrowPayload = { +val batches = scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch] +var i = 0 +while (i < payloadByteArrays.length) { + val payloadBytes = payloadByteArrays(i) + val in = new ByteArrayReadableS
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600492 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala --- @@ -0,0 +1,432 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.channels.{Channels, SeekableByteChannel} + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BaseAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter} +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * ArrowReader requires a seekable byte channel. + * TODO: This is available in arrow-vector now with ARROW-615, to be included in 0.2.1 release + */ +private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: Array[Byte]) + extends SeekableByteChannel { + var _position: Long = 0L + + override def isOpen: Boolean = { +byteArray != null + } + + override def close(): Unit = { +byteArray = null + } + + override def read(dst: ByteBuffer): Int = { +val remainingBuf = byteArray.length - _position +val length = Math.min(dst.remaining(), remainingBuf).toInt +dst.put(byteArray, _position.toInt, length) +_position += length +length + } + + override def position(): Long = _position + + override def position(newPosition: Long): SeekableByteChannel = { +_position = newPosition.toLong +this + } + + override def size: Long = { +byteArray.length.toLong + } + + override def write(src: ByteBuffer): Int = { +throw new UnsupportedOperationException("Read Only") + } + + override def truncate(size: Long): SeekableByteChannel = { +throw new UnsupportedOperationException("Read Only") + } +} + +/** + * Intermediate data structure returned from Arrow conversions + */ +private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch] + +/** + * Build a payload from existing ArrowRecordBatches + */ +private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends ArrowPayload { + private val iter = batches.iterator + override def next(): ArrowRecordBatch = iter.next() + override def hasNext: Boolean = iter.hasNext +} + +/** + * Class that wraps an Arrow RootAllocator used in conversion + */ +private[sql] class ArrowConverters { + private val _allocator = new RootAllocator(Long.MaxValue) + + private[sql] def allocator: RootAllocator = _allocator + + /** + * Iterate over the rows and convert to an ArrowPayload, using RootAllocator from this class + */ + def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: StructType): ArrowPayload = { +val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, schema, _allocator) +new ArrowStaticPayload(batch) + } + + /** + * Read an Array of Arrow Record batches as byte Arrays into an ArrowPayload, using + * RootAllocator from this class + */ + def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): ArrowPayload = { +val batches = scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch] +var i = 0 +while (i < payloadByteArrays.length) { + val payloadBytes = payloadByteArrays(i) + val in = new ByteArrayReadableS
[GitHub] spark pull request #17767: [SPARK-20468] Refactor the ALS code
Github user danielyli closed the pull request at: https://github.com/apache/spark/pull/17767 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600390 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala --- @@ -0,0 +1,432 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.channels.{Channels, SeekableByteChannel} + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BaseAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter} +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * ArrowReader requires a seekable byte channel. + * TODO: This is available in arrow-vector now with ARROW-615, to be included in 0.2.1 release + */ +private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: Array[Byte]) + extends SeekableByteChannel { + var _position: Long = 0L + + override def isOpen: Boolean = { +byteArray != null + } + + override def close(): Unit = { +byteArray = null + } + + override def read(dst: ByteBuffer): Int = { +val remainingBuf = byteArray.length - _position +val length = Math.min(dst.remaining(), remainingBuf).toInt +dst.put(byteArray, _position.toInt, length) +_position += length +length + } + + override def position(): Long = _position + + override def position(newPosition: Long): SeekableByteChannel = { +_position = newPosition.toLong +this + } + + override def size: Long = { +byteArray.length.toLong + } + + override def write(src: ByteBuffer): Int = { +throw new UnsupportedOperationException("Read Only") + } + + override def truncate(size: Long): SeekableByteChannel = { +throw new UnsupportedOperationException("Read Only") + } +} + +/** + * Intermediate data structure returned from Arrow conversions + */ +private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch] + +/** + * Build a payload from existing ArrowRecordBatches + */ +private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends ArrowPayload { + private val iter = batches.iterator + override def next(): ArrowRecordBatch = iter.next() + override def hasNext: Boolean = iter.hasNext +} + +/** + * Class that wraps an Arrow RootAllocator used in conversion + */ +private[sql] class ArrowConverters { + private val _allocator = new RootAllocator(Long.MaxValue) + + private[sql] def allocator: RootAllocator = _allocator + + /** + * Iterate over the rows and convert to an ArrowPayload, using RootAllocator from this class + */ + def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: StructType): ArrowPayload = { +val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, schema, _allocator) +new ArrowStaticPayload(batch) + } + + /** + * Read an Array of Arrow Record batches as byte Arrays into an ArrowPayload, using + * RootAllocator from this class + */ + def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): ArrowPayload = { +val batches = scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch] +var i = 0 +while (i < payloadByteArrays.length) { + val payloadBytes = payloadByteArrays(i) + val in = new ByteArrayReadableS
[GitHub] spark issue #17767: [SPARK-20468] Refactor the ALS code
Github user danielyli commented on the issue: https://github.com/apache/spark/pull/17767 Closing this PR as per [SPARK-20468](https://issues.apache.org/jira/browse/SPARK-20468?focusedCommentId=15984365&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15984365). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113600388 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 +while (a < table.length) { + table.update(a, i) + if (a.toFloat / table.length >= normalizedWeights(i)) { +i = math.min(normalizedWeights.length - 1, i + 1) --- End diff -- An ``if else`` here would be much more legible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113600264 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. + * We sacrifice memory here, to get constant time lookups into this array when generating + * negative samples. + */ + private def generateUnigramTable(normalizedWeights: Array[Double], tableSize: Int): Array[Int] = { +val table = Array.fill(tableSize)(0) +var a = 0 +var i = 0 --- End diff -- Could you please use more descriptive variable names here? I was expecting ``i`` to be the index into the table array --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600189 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala --- @@ -0,0 +1,432 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.channels.{Channels, SeekableByteChannel} + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BaseAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter} +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * ArrowReader requires a seekable byte channel. + * TODO: This is available in arrow-vector now with ARROW-615, to be included in 0.2.1 release + */ +private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: Array[Byte]) + extends SeekableByteChannel { + var _position: Long = 0L + + override def isOpen: Boolean = { +byteArray != null + } + + override def close(): Unit = { +byteArray = null + } + + override def read(dst: ByteBuffer): Int = { +val remainingBuf = byteArray.length - _position +val length = Math.min(dst.remaining(), remainingBuf).toInt +dst.put(byteArray, _position.toInt, length) +_position += length +length + } + + override def position(): Long = _position + + override def position(newPosition: Long): SeekableByteChannel = { +_position = newPosition.toLong +this + } + + override def size: Long = { +byteArray.length.toLong + } + + override def write(src: ByteBuffer): Int = { +throw new UnsupportedOperationException("Read Only") + } + + override def truncate(size: Long): SeekableByteChannel = { +throw new UnsupportedOperationException("Read Only") + } +} + +/** + * Intermediate data structure returned from Arrow conversions + */ +private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch] + +/** + * Build a payload from existing ArrowRecordBatches + */ +private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends ArrowPayload { + private val iter = batches.iterator + override def next(): ArrowRecordBatch = iter.next() + override def hasNext: Boolean = iter.hasNext +} + +/** + * Class that wraps an Arrow RootAllocator used in conversion + */ +private[sql] class ArrowConverters { + private val _allocator = new RootAllocator(Long.MaxValue) + + private[sql] def allocator: RootAllocator = _allocator + + /** + * Iterate over the rows and convert to an ArrowPayload, using RootAllocator from this class + */ + def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: StructType): ArrowPayload = { +val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, schema, _allocator) +new ArrowStaticPayload(batch) + } + + /** + * Read an Array of Arrow Record batches as byte Arrays into an ArrowPayload, using + * RootAllocator from this class + */ + def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): ArrowPayload = { +val batches = scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch] +var i = 0 +while (i < payloadByteArrays.length) { + val payloadBytes = payloadByteArrays(i) + val in = new ByteArrayReadableS
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113600062 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala --- @@ -0,0 +1,432 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.channels.{Channels, SeekableByteChannel} + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BaseAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter} +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * ArrowReader requires a seekable byte channel. + * TODO: This is available in arrow-vector now with ARROW-615, to be included in 0.2.1 release + */ +private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: Array[Byte]) + extends SeekableByteChannel { + var _position: Long = 0L + + override def isOpen: Boolean = { +byteArray != null + } + + override def close(): Unit = { +byteArray = null + } + + override def read(dst: ByteBuffer): Int = { +val remainingBuf = byteArray.length - _position +val length = Math.min(dst.remaining(), remainingBuf).toInt +dst.put(byteArray, _position.toInt, length) +_position += length +length + } + + override def position(): Long = _position + + override def position(newPosition: Long): SeekableByteChannel = { +_position = newPosition.toLong +this + } + + override def size: Long = { +byteArray.length.toLong + } + + override def write(src: ByteBuffer): Int = { +throw new UnsupportedOperationException("Read Only") + } + + override def truncate(size: Long): SeekableByteChannel = { +throw new UnsupportedOperationException("Read Only") + } +} + +/** + * Intermediate data structure returned from Arrow conversions + */ +private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch] --- End diff -- No, I think this should be changed a little. `ArrowPayload` is meant to encapsulate Arrow classes from the rest of Spark and wrap Arrow data to extend `serializable` to allow an `RDD[ArrowPayload]`. I'll push an update that will clean this up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17596 **[Test build #76209 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76209/testReport)** for PR 17596 at commit [`0f028b1`](https://github.com/apache/spark/commit/0f028b1f79b2f76ae6c1ea2243b72f211961ad02). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...
Github user Krimit commented on a diff in the pull request: https://github.com/apache/spark/pull/17673#discussion_r113599808 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala --- @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.1") override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra) + + /** + * Similar to InitUnigramTable in the original code. Instead of using an array of size 100 million + * like the original, we size it to be 20 times the vocabulary size. --- End diff -- why 20 times the vocabulary size? Also, this comment is misplaced. It belongs with the ``unigramTableSizeFactor`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org