[GitHub] spark pull request #17770: [SPARK-20392][SQL][WIP] Set barrier to prevent re...

2017-04-26 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17770#discussion_r113624301
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala 
---
@@ -72,6 +72,34 @@ object CurrentOrigin {
   }
 }
 
+case class Barrier(node: Option[TreeNode[_]] = None)
--- End diff --

My original thought is: If we use a barrier node, we need to modify many 
places where we create a new logical plan and wrap it with the barrier node.

I will revamp it with a barrier node.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17781: [SPARK-20476] [SQL] Block users to create a table that u...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17781
  
**[Test build #76217 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76217/testReport)**
 for PR 17781 at commit 
[`9563de4`](https://github.com/apache/spark/commit/9563de4b54233f1f2148dac20178b7b8c4ff1f41).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-g...

2017-04-26 Thread gatorsmile
Github user gatorsmile closed the pull request at:

https://github.com/apache/spark/pull/17776


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17776
  
Close this to prefer another PR https://github.com/apache/spark/pull/17776


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17781: [SPARK-20476] [SQL] Block users to create a table...

2017-04-26 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/17781

[SPARK-20476] [SQL] Block users to create a table that use commas in the 
column names

### What changes were proposed in this pull request?
```SQL
hive> create table t1(`a,` string);
OK
Time taken: 1.399 seconds

hive> create table t2(`a,` string, b string);
FAILED: Execution Error, return code 1 from 
org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: 
MetaException(message:org.apache.hadoop.hive.serde2.SerDeException 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements 
while columns.types has 2 elements!)

hive> create table t2(`a,` string, b string) stored as parquet;
FAILED: Execution Error, return code 1 from 
org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.IllegalArgumentException: 
ParquetHiveSerde initialization failed. Number of column name and column type 
differs. columnNames = [a, , b], columnTypes = [string, string]
```
It has a bug in Hive metastore. 

When users do not provide alias name in the SELECT query, we call 
`toPrettySQL` to generate the alias name. For example, the string 
`get_json_object(jstring, '$.f1')` will be the alias name for the function call 
in the statement 
```SQL
SELECT key, get_json_object(jstring, '$.f1') FROM tempView
``` 
Above is not an issue for the SELECT query statements. However, for CTAS, 
we hit the issue due to a bug in Hive metastore. Hive metastore does not like 
the column names containing commas and returned a confusing error message, like:
```
17/04/26 23:12:56 ERROR [hive.log(397) -- main]: error in initSerDe: 
org.apache.hadoop.hive.serde2.SerDeException 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
```

Thus, this PR is to block users to create a table in Hive metastore when 
the table table has a column containing commas in the name. 

### How was this patch tested?
Added a test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark blockIllegalColumnNames

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17781.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17781


commit 9563de4b54233f1f2148dac20178b7b8c4ff1f41
Author: Xiao Li 
Date:   2017-04-27T06:24:39Z

fix.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17776
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76216/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17776
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17776
  
**[Test build #76216 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76216/testReport)**
 for PR 17776 at commit 
[`bcb8cf7`](https://github.com/apache/spark/commit/bcb8cf74bee92cb04c97f1d64254123df2fb393b).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17736: [SPARK-20399][SQL] Can't use same regex pattern between ...

2017-04-26 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17736
  
ping @cloud-fan @hvanhovell 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17768
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76215/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17768
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17768
  
**[Test build #76215 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76215/testReport)**
 for PR 17768 at commit 
[`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17771: [SPARK-20471]Remove AggregateBenchmark testsuite warning...

2017-04-26 Thread heary-cao
Github user heary-cao commented on the issue:

https://github.com/apache/spark/pull/17771
  
@hvanhovell  
thanks for review it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17780
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17780
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76214/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17780
  
**[Test build #76214 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76214/testReport)**
 for PR 17780 at commit 
[`768738c`](https://github.com/apache/spark/commit/768738c4503938dfb467f8cfd22ffe01bc098c07).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17733: [SPARK-20425][SQL] Support a vertical display mod...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17733


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17649: [SPARK-20380][SQL] Output table comment for DESC ...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17649#discussion_r113616846
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 ---
@@ -295,7 +295,9 @@ class InMemoryCatalog(
 assert(tableDefinition.identifier.database.isDefined)
 val db = tableDefinition.identifier.database.get
 requireTableExists(db, tableDefinition.identifier.table)
-catalog(db).tables(tableDefinition.identifier.table).table = 
tableDefinition
+val updatedProperties = tableDefinition.properties.filter(kv => kv._1 
!= "comment")
+val newTableDefinition = tableDefinition.copy(properties = 
updatedProperties)
+catalog(db).tables(tableDefinition.identifier.table).table = 
newTableDefinition
--- End diff --

Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17714: [SPARK-20428][Core]REST interface about 'v1/submissions/...

2017-04-26 Thread guoxiaolongzte
Github user guoxiaolongzte commented on the issue:

https://github.com/apache/spark/pull/17714
  
@HyukjinKwon Help with code review,thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17733: [SPARK-20425][SQL] Support a vertical display mode for D...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17733
  
LGTM 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17768
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17768
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76213/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17768
  
**[Test build #76213 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76213/testReport)**
 for PR 17768 at commit 
[`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17735
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76211/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17735
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17735
  
**[Test build #76211 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76211/testReport)**
 for PR 17735 at commit 
[`db11db0`](https://github.com/apache/spark/commit/db11db050f378768a22520bd95b869c679638430).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17596
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76209/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17596
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17596
  
**[Test build #76209 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76209/testReport)**
 for PR 17596 at commit 
[`0f028b1`](https://github.com/apache/spark/commit/0f028b1f79b2f76ae6c1ea2243b72f211961ad02).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17778: Add array_unique UDF

2017-04-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/17778
  
Hi @janewangfb, it looks we need a JIRA, better PR title and PR 
description. Please check out http://spark.apache.org/contributing.html.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17776
  
@cloud-fan @sameeragarwal @hvanhovell @ueshin 

Should we just fix the issue in CTAS? Even if users provide commas in the 
alias names, we just simply remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17776
  
**[Test build #76216 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76216/testReport)**
 for PR 17776 at commit 
[`bcb8cf7`](https://github.com/apache/spark/commit/bcb8cf74bee92cb04c97f1d64254123df2fb393b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [SPARK-20465][CORE] Throws a proper exception when any t...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17768
  
**[Test build #76215 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76215/testReport)**
 for PR 17768 at commit 
[`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17776
  
When users do not provide alias name in the SELECT query, we call 
`toPrettySQL` to generate the alias name. 

For example, 
the string `get_json_object(jstring, '$.f1')` will be the alias name for 
the function call in the statement 
```SQL
SELECT key, get_json_object(jstring, '$.f1') FROM tempView
``` 

Above is not an issue for the SELECT query statements. However, for CTAS, 
we hit the issue due to a bug in Hive metastore. Hive metastore does not like 
the column names containing commas and returned a confusing error message, like:
```
17/04/26 23:12:56 ERROR [hive.log(397) -- main]: error in initSerDe: 
org.apache.hadoop.hive.serde2.SerDeException 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
```

Thus, this PR is to remove the comma from the alias names so that Spark SQL 
users can do CTAS for the function call containing commas but without 
user-given alias names. 

BTW, also add the description into the PR description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...

2017-04-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/17768
  
Current status: I don't know how using

```scala
val dirs = Array.empty[Int]
dirs.headOption.getOrElse {
   throw new Exception("")
}
```

instead of 

```scala
val dirs = Array.empty[Int]
dirs(0)
```

could make the tests failed (given the observations above).
In the last commit, I gave a shot it with if-else as below:

```scala
val dirs = Array.empty[Int]
if (dirs.nonEmpty) {
  dirs(0)
} else {
  throw new Exception("")
}
```

To make sure logically this should not make other tests flaky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...

2017-04-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/17768
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/1
  
LGTM except two minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17777: [SPARK-20482][SQL] Resolving Casts is too strict ...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/1#discussion_r113609686
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -165,6 +181,14 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
 copy(timeZoneId = Option(timeZoneId))
 
+  // When this cast involves TimeZone, it's only resolved if the 
timeZoneId is set;
+  // Otherwise behave like Expression.resolved.
+  override lazy val resolved: Boolean =
+childrenResolved && checkInputDataTypes().isSuccess &&
+(!needsTimeZone || timeZoneId.isDefined)
--- End diff --

Nit: It fits one line. Could you just move it to the same line:
```Scala
childrenResolved && checkInputDataTypes().isSuccess && (!needsTimeZone 
|| timeZoneId.isDefined)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17777: [SPARK-20482][SQL] Resolving Casts is too strict ...

2017-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/1#discussion_r113609438
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -89,6 +89,22 @@ object Cast {
 case _ => false
   }
 
+  def needsTimeZone(from: DataType, to: DataType): Boolean = (from, to) 
match {
--- End diff --

Nit: private? 

Could you write comments to explain why we only consider these type pairs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite ...

2017-04-26 Thread tejasapatil
GitHub user tejasapatil opened a pull request:

https://github.com/apache/spark/pull/17780

[SPARK-20487][SQL] `HiveTableScan` node is quite verbose in explained plan

## What changes were proposed in this pull request?

Changed `TreeNode.argString` to handle `CatalogTable` separately (otherwise 
it would call the default `toString` on the `CatalogTable`)

## How was this patch tested?

- Expanded scope of existing unit test to ensure that verbose information 
is not present
- Manual testing

Before

```
scala> hc.sql(" SELECT * FROM my_table WHERE name = 'foo' ").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('name = foo)
   +- 'UnresolvedRelation `my_table`

== Analyzed Logical Plan ==
user_id: bigint, name: string, ds: string
Project [user_id#13L, name#14, ds#15]
+- Filter (name#14 = foo)
   +- SubqueryAlias my_table
  +- CatalogRelation CatalogTable(
Database: default
Table: my_table
Owner: tejasp
Created: Fri Apr 14 17:05:50 PDT 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/tmp/warehouse/my_table
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`ds`]
Schema: root
-- user_id: long (nullable = true)
-- name: string (nullable = true)
-- ds: string (nullable = true)
), [user_id#13L, name#14], [ds#15]

== Optimized Logical Plan ==
Filter (isnotnull(name#14) && (name#14 = foo))
+- CatalogRelation CatalogTable(
Database: default
Table: my_table
Owner: tejasp
Created: Fri Apr 14 17:05:50 PDT 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/tmp/warehouse/my_table
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`ds`]
Schema: root
-- user_id: long (nullable = true)
-- name: string (nullable = true)
-- ds: string (nullable = true)
), [user_id#13L, name#14], [ds#15]

== Physical Plan ==
*Filter (isnotnull(name#14) && (name#14 = foo))
+- HiveTableScan [user_id#13L, name#14, ds#15], CatalogRelation 
CatalogTable(
Database: default
Table: my_table
Owner: tejasp
Created: Fri Apr 14 17:05:50 PDT 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/tmp/warehouse/my_table
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`ds`]
Schema: root
-- user_id: long (nullable = true)
-- name: string (nullable = true)
-- ds: string (nullable = true)
), [user_id#13L, name#14], [ds#15]
```

After

```
scala> hc.sql(" SELECT * FROM my_table WHERE name = 'foo' ").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('name = foo)
   +- 'UnresolvedRelation `my_table`

== Analyzed Logical Plan ==
user_id: bigint, name: string, ds: string
Project [user_id#13L, name#14, ds#15]
+- Filter (name#14 = foo)
   +- SubqueryAlias my_table
  +- CatalogRelation `default`.`my_table`, [user_id#13L, name#14], 
[ds#15]

== Optimized Logical Plan ==
Filter (isnotnull(name#14) && (name#14 = foo))
+- CatalogRelation `default`.`my_table`, [user_id#13L, name#14], [ds#15]

== Physical Plan ==
*Filter (isnotnull(name#14) && (name#14 = foo))
+- HiveTableScan [user_id#13L, name#14, ds#15], CatalogRelation 
`default`.`my_table`, [user_id#13L, name#14], [ds#15]
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tejasapatil/spark SPARK-20487_verbose_plan

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17780.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17780


commit 768738c4503938dfb467f8cfd22ffe01bc098c07
Author: Tejas Patil 
Date:   2017-04-27T03:03:56Z

[SPARK-20487][SQL] `HiveTableScan` node is quite verbose in ex

[GitHub] spark issue #17780: [SPARK-20487][SQL] `HiveTableScan` node is quite verbose...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17780
  
**[Test build #76214 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76214/testReport)**
 for PR 17780 at commit 
[`768738c`](https://github.com/apache/spark/commit/768738c4503938dfb467f8cfd22ffe01bc098c07).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17779
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76212/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17779
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17779
  
**[Test build #76212 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76212/testReport)**
 for PR 17779 at commit 
[`e5c1415`](https://github.com/apache/spark/commit/e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113605788
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
--- End diff --

I might be missing something, but what is meant by ``digitSentences``?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17735
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113605682
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
+}.repartition($(numPartitions)).cache()
+
+val learningRate = $(stepSize)
+
+val wordsPerPartition = totalWordCount / $(numPartitions)
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+for {iteration <- 1 to $(maxIter)} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
+logInfo(s"Iteration: $iteration, Partition: $i_")
+logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
+val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
+val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
+
+val groupedBatches = contextWordPairs.grouped(batchSize)
+
+val negLabels = 1.0f +: 

[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17735
  
**[Test build #76208 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76208/testReport)**
 for PR 17735 at commit 
[`98eb3dc`](https://github.com/apache/spark/commit/98eb3dcdd15d9c5649bbca7effe4f0077130c352).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17768: [WIP][SPARK-20465][CORE] Throws a proper exception when ...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17768
  
**[Test build #76213 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76213/testReport)**
 for PR 17768 at commit 
[`b5c9839`](https://github.com/apache/spark/commit/b5c9839476fe0ddfe2a194afd89a68542c749a6f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113605031
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
+}.repartition($(numPartitions)).cache()
+
+val learningRate = $(stepSize)
+
+val wordsPerPartition = totalWordCount / $(numPartitions)
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+for {iteration <- 1 to $(maxIter)} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
+logInfo(s"Iteration: $iteration, Partition: $i_")
+logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
+val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
+val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
+
+val groupedBatches = contextWordPairs.grouped(batchSize)
+
+val negLabels = 1.0f +: 

[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113605000
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
+}.repartition($(numPartitions)).cache()
+
+val learningRate = $(stepSize)
+
+val wordsPerPartition = totalWordCount / $(numPartitions)
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+for {iteration <- 1 to $(maxIter)} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
+logInfo(s"Iteration: $iteration, Partition: $i_")
+logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
+val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
+val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
+
+val groupedBatches = contextWordPairs.grouped(batchSize)
+
+val negLabels = 1.0f +: 

[GitHub] spark issue #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Words mode...

2017-04-26 Thread Krimit
Github user Krimit commented on the issue:

https://github.com/apache/spark/pull/17673
  
@shubhamchopra have you run this code in a distributed spark cluster yet?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17776
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread wesm
Github user wesm commented on the issue:

https://github.com/apache/spark/pull/15821
  
Note: we are shooting for an Arrow RC in Monday time frame, so with luck 
we'll have a release cut next week


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17776
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76207/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

2017-04-26 Thread yssharma
Github user yssharma commented on the issue:

https://github.com/apache/spark/pull/17467
  
@budde would you like to share your thoughts on the new changes when you 
have time ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17776: [SPARK-20476] [SQL] Exclude Comma From Our Auto-generate...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17776
  
**[Test build #76207 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76207/testReport)**
 for PR 17776 at commit 
[`24f37f1`](https://github.com/apache/spark/commit/24f37f1df3350b3e9977f617c07febeff16cba6c).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ComplexTypesSuite extends PlanTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17779: [DOCS][MINOR] Add missing since to SparkR repeat_string ...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17779
  
**[Test build #76212 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76212/testReport)**
 for PR 17779 at commit 
[`e5c1415`](https://github.com/apache/spark/commit/e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113603661
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
+}.repartition($(numPartitions)).cache()
+
+val learningRate = $(stepSize)
+
+val wordsPerPartition = totalWordCount / $(numPartitions)
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+for {iteration <- 1 to $(maxIter)} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
+logInfo(s"Iteration: $iteration, Partition: $i_")
+logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
+val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
+val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
+
+val groupedBatches = contextWordPairs.grouped(batchSize)
+
+val negLabels = 1.0f +: 

[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113603493
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
+}.repartition($(numPartitions)).cache()
+
+val learningRate = $(stepSize)
+
+val wordsPerPartition = totalWordCount / $(numPartitions)
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+for {iteration <- 1 to $(maxIter)} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
+logInfo(s"Iteration: $iteration, Partition: $i_")
+logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
--- End diff --

Are these logs really necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastr

[GitHub] spark pull request #17779: [DOCS][MINOR] Add missing since to SparkR repeat_...

2017-04-26 Thread zero323
GitHub user zero323 opened a pull request:

https://github.com/apache/spark/pull/17779

[DOCS][MINOR] Add missing since to SparkR repeat_string note.

## What changes were proposed in this pull request?

Replace

@note repeat_string 2.3.0

with 
  
@note repeat_string since 2.3.0

## How was this patch tested?

`create-docs.sh`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zero323/spark REPEAT-NOTE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17779


commit e5c141511b35b17bc6ca3c3f1fcc302449cf4e8f
Author: zero323 
Date:   2017-04-27T02:20:56Z

Add missing since to repeat note




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/1
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/1
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76205/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/1
  
**[Test build #76205 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76205/testReport)**
 for PR 1 at commit 
[`27ec604`](https://github.com/apache/spark/commit/27ec604f23ff76e943c395ead144538480ef51ed).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113603142
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
+val unigramTablebc = sc.broadcast(uniTable)
+
+val window = $(windowSize)
+
+val digitSentences = input.flatMap{sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
+  wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
--- End diff --

I think you need to broadcast ``maxSentenceLength``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/15821
  
Updated to work with the latest Arrow to prepare for 0.3 release (tests 
should fail because that artifact is not yet available).  Also improved 
consistency of ArrowConverters and did some cleanup.  From @rxin 's comments:

> Move ArrowConverters.scala somewhere else that's not top level, e.g. 
execution.arrow

It is now in the o.a.s.sql.execution.arrow package

> Update this to arrow 0.3

Ready to do this, should just need to update the pom again

>Use SQLConf rather than a parameter for toPandas.

I removed this flag and used the conf "spark.sql.execution.arrow.enable" 
which defaults to "false"

>Handle failure gracefully if arrow is not installed (or somehow package it 
with Spark?)

It would be difficult to package with Spark, I think, because pyarrow also 
depends on the native Arrow cpp library.  I changed it to fail gracefully if 
pyarrow is not available.  The error message is:
```
ImportError: No module named pyarrow
note: pyarrow must be installed and available on calling Python processif 
using spark.sql.execution.arrow.enable=true
```

>How are the memory managed? Who allocates the memory for the arrow 
records, and who's responsible for releasing them?

The Java side of Arrow requires using a BufferAllocator class that manages 
the allocated memory.  An instance of this must be used each time a 
ArrowRecordBatch is created and then the batch and allocator must be 
released/closed after they have been processed.  This is all handled in the 
`ArrowConverter` functions.  On the Python side, buffers are allocated from the 
Arrow cpp library and cleaned up when reference counts to the objects are zero. 
 The end user does not have to worry about managing any memory.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113602908
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
+  s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
+val seed = $(this.seed)
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = $(this.vectorSize)
+
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapbc = sc.broadcast(vocabMap)
--- End diff --

nit: ``vocabMapBroadcast``, same for ``unigramTable``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17735
  
**[Test build #76211 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76211/testReport)**
 for PR 17735 at commit 
[`db11db0`](https://github.com/apache/spark/commit/db11db050f378768a22520bd95b869c679638430).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113602640
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
--- End diff --

No need to return ``vocabMap.size``, that information is duplicated from 
``vocabMap``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113602559
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
+val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
+val negSamples = $(negativeSamples)
+assert(negSamples < vocabSize,
--- End diff --

Is this assertion truly needed? What about just ``negSamples = 
min($(negSamples) - 1, vocabSize)``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113602341
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
+   */
+  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
--- End diff --

I think the CBOW logic should definitely live outside of this class. I'm 
even wondering if it's worthwhile to update the mllib Word2Vec to have access 
to this as well. @MLnick @srowen thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/1
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76204/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/1
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17777: [SPARK-20482][SQL] Resolving Casts is too strict on havi...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/1
  
**[Test build #76204 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76204/testReport)**
 for PR 1 at commit 
[`76debfb`](https://github.com/apache/spark/commit/76debfb9bd82c63bc354a71d95ab84db0f609fa3).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113602046
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * @param input
+   * @return
--- End diff --

nit: fill these out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113601993
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
+// and use binary search to get insertion points. This should 
replicate the same
+// behavior as the table in original implementation.
+val weights = vocab.map(x => scala.math.pow(x._2, power))
+val totalWeight = weights.sum
+
+val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
+
+val unigramTableSize =
+  math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
+val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
+
+(vocabMap.size, totalWordCount, vocabMap, unigramTable)
+  }
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
--- End diff --

Nit: It's worthwhile to describe how this is parallelized. What happens if 
the skip-gram implementation changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15821
  
**[Test build #76210 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76210/testReport)**
 for PR 15821 at commit 
[`b6fe733`](https://github.com/apache/spark/commit/b6fe733955d6e153722b1945c09ed663d8ed9be2).
 * This patch **fails build dependency tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15821
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/76210/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15821
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113601799
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
--- End diff --

maybe call this ``wordCounts`` or something like that to make it clearer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15821
  
**[Test build #76210 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76210/testReport)**
 for PR 15821 at commit 
[`b6fe733`](https://github.com/apache/spark/commit/b6fe733955d6e153722b1945c09ed663d8ed9be2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113601656
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
+  w -> i
+}.toMap
+
+// We create a cumulative distribution array, unlike the original 
implemention
--- End diff --

In curious, why ``unlike the original implemention``? why not follow the 
original? (also, typo, should be ``implementation``)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113601513
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
+  .collect()
+  .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
+
+val totalWordCount = vocab.map(_._2).sum
+
+val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
--- End diff --

``vocab.keySet.zipWithIndex``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600926
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ArrowConvertersSuite.scala ---
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+import java.text.SimpleDateFormat
+import java.util.Locale
+
+import com.google.common.io.Files
+import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
+import org.apache.arrow.vector.file.json.JsonFileReader
+import org.apache.arrow.vector.util.Validator
+import org.json4s.jackson.JsonMethods._
+import org.json4s.JsonAST._
+import org.json4s.JsonDSL._
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+
+class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll 
{
+  import testImplicits._
+
+  private var tempDataPath: String = _
+
+  private def collectAsArrow(df: DataFrame,
+ converter: Option[ArrowConverters] = None): 
ArrowPayload = {
+val cnvtr = converter.getOrElse(new ArrowConverters)
+val payloadByteArrays = df.toArrowPayloadBytes().collect()
+cnvtr.readPayloadByteArrays(payloadByteArrays)
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+tempDataPath = Utils.createTempDir(namePrefix = 
"arrow").getAbsolutePath
+  }
+
+  test("collect to arrow record batch") {
+val indexData = (1 to 6).toDF("i")
+val arrowPayload = collectAsArrow(indexData)
+assert(arrowPayload.nonEmpty)
+val arrowBatches = arrowPayload.toArray
+assert(arrowBatches.length == indexData.rdd.getNumPartitions)
+val rowCount = arrowBatches.map(batch => batch.getLength).sum
+assert(rowCount === indexData.count())
+arrowBatches.foreach(batch => assert(batch.getNodes.size() > 0))
+arrowBatches.foreach(batch => batch.close())
+  }
+
+  test("numeric type conversion") {
+collectAndValidate(indexData)
+collectAndValidate(shortData)
+collectAndValidate(intData)
+collectAndValidate(longData)
+collectAndValidate(floatData)
+collectAndValidate(doubleData)
+  }
+
+  test("mixed numeric type conversion") {
+collectAndValidate(mixedNumericData)
+  }
+
+  test("boolean type conversion") {
+collectAndValidate(boolData)
+  }
+
+  test("string type conversion") {
+collectAndValidate(stringData)
+  }
+
+  test("byte type conversion") {
+collectAndValidate(byteData)
+  }
+
+  ignore("timestamp conversion") {
+collectAndValidate(timestampData)
+  }
+
+  // TODO: Not currently supported in Arrow JSON reader
+  ignore("date conversion") {
+// collectAndValidate(dateTimeData)
+  }
+
+  // TODO: Not currently supported in Arrow JSON reader
+  ignore("binary type conversion") {
+// collectAndValidate(binaryData)
+  }
+
+  test("floating-point NaN") {
+collectAndValidate(floatNaNData)
+  }
+
+  test("partitioned DataFrame") {
+val converter = new ArrowConverters
+val schema = testData2.schema
+val arrowPayload = collectAsArrow(testData2, Some(converter))
+val arrowBatches = arrowPayload.toArray
+// NOTE: testData2 should have 2 partitions -> 2 arrow batches in 
payload
+assert(arrowBatches.length === 2)
+val pl1 = new ArrowStaticPayload(arrowBatches(0))
+val pl2 = new ArrowStaticPayload(arrowBatches(1))
+// Generate JSON files
+val a = List[Int](1, 1, 2, 2, 3, 3)
+val b = List[Int](1, 2, 1, 2, 1, 2)
+val fields

[GitHub] spark pull request #17191: [SPARK-14471][SQL] Aliases in SELECT could be use...

2017-04-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17191#discussion_r113600909
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -998,6 +998,22 @@ class Analyzer(
   }
 
   /**
+   * Replace unresolved expressions in grouping keys with resolved ones in 
SELECT clauses.
--- End diff --

ok, I'll update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17191: [SPARK-14471][SQL] Aliases in SELECT could be use...

2017-04-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17191#discussion_r113600878
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -998,6 +998,22 @@ class Analyzer(
   }
 
   /**
+   * Replace unresolved expressions in grouping keys with resolved ones in 
SELECT clauses.
+   */
+  object ResolveAggAliasInGroupBy extends Rule[LogicalPlan] {
+
+override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+  case agg @ Aggregate(groups, aggs, child)
+  if conf.groupByAliases && child.resolved && 
aggs.forall(_.resolved) &&
+groups.exists(!_.resolved) =>
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17733: [SPARK-20425][SQL] Support a vertical display mode for D...

2017-04-26 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/17733
  
@gatorsmile Could you check again? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113600716
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
+val sc = input.context
+
+val words = input.flatMap(x => x)
+
+val vocab = words.map(w => (w, 1L))
+  .reduceByKey(_ + _)
+  .filter{case (w, c) => c >= $(minCount)}
--- End diff --

You should broadcast ``$(minCount)`` to avoid shipping the entire class to 
each task. (and make sure to destroy it at the end of the method)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113600548
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
+  }
+  a += 1
+}
+table
+  }
+
+  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
+  (Int, Long, Map[String, Int], Array[Int]) = {
--- End diff --

This could use a named class. It's not immediately clear what each of these 
are


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600552
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,432 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * TODO: This is available in arrow-vector now with ARROW-615, to be 
included in 0.2.1 release
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  /**
+   * Iterate over the rows and convert to an ArrowPayload, using 
RootAllocator from this class
+   */
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, _allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  /**
+   * Read an Array of Arrow Record batches as byte Arrays into an 
ArrowPayload, using
+   * RootAllocator from this class
+   */
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableS

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600492
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,432 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * TODO: This is available in arrow-vector now with ARROW-615, to be 
included in 0.2.1 release
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  /**
+   * Iterate over the rows and convert to an ArrowPayload, using 
RootAllocator from this class
+   */
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, _allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  /**
+   * Read an Array of Arrow Record batches as byte Arrays into an 
ArrowPayload, using
+   * RootAllocator from this class
+   */
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableS

[GitHub] spark pull request #17767: [SPARK-20468] Refactor the ALS code

2017-04-26 Thread danielyli
Github user danielyli closed the pull request at:

https://github.com/apache/spark/pull/17767


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,432 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * TODO: This is available in arrow-vector now with ARROW-615, to be 
included in 0.2.1 release
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  /**
+   * Iterate over the rows and convert to an ArrowPayload, using 
RootAllocator from this class
+   */
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, _allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  /**
+   * Read an Array of Arrow Record batches as byte Arrays into an 
ArrowPayload, using
+   * RootAllocator from this class
+   */
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableS

[GitHub] spark issue #17767: [SPARK-20468] Refactor the ALS code

2017-04-26 Thread danielyli
Github user danielyli commented on the issue:

https://github.com/apache/spark/pull/17767
  
Closing this PR as per 
[SPARK-20468](https://issues.apache.org/jira/browse/SPARK-20468?focusedCommentId=15984365&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15984365).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113600388
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
+while (a < table.length) {
+  table.update(a, i)
+  if (a.toFloat / table.length >= normalizedWeights(i)) {
+i = math.min(normalizedWeights.length - 1, i + 1)
--- End diff --

An ``if else`` here would be much more legible


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113600264
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
+   * We sacrifice memory here, to get constant time lookups into this 
array when generating
+   * negative samples.
+   */
+  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
+val table = Array.fill(tableSize)(0)
+var a = 0
+var i = 0
--- End diff --

Could you please use more descriptive variable names here? I was expecting 
``i`` to be the index into the table array


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600189
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,432 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * TODO: This is available in arrow-vector now with ARROW-615, to be 
included in 0.2.1 release
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  /**
+   * Iterate over the rows and convert to an ArrowPayload, using 
RootAllocator from this class
+   */
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, _allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  /**
+   * Read an Array of Arrow Record batches as byte Arrays into an 
ArrowPayload, using
+   * RootAllocator from this class
+   */
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableS

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113600062
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,432 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * TODO: This is available in arrow-vector now with ARROW-615, to be 
included in 0.2.1 release
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
--- End diff --

No, I think this should be changed a little.  `ArrowPayload` is meant to 
encapsulate Arrow classes from the rest of Spark and wrap Arrow data to extend 
`serializable` to allow an `RDD[ArrowPayload]`.  I'll push an update that will 
clean this up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17596: [SPARK-12837][CORE] Do not send the name of internal acc...

2017-04-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17596
  
**[Test build #76209 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76209/testReport)**
 for PR 17596 at commit 
[`0f028b1`](https://github.com/apache/spark/commit/0f028b1f79b2f76ae6c1ea2243b72f211961ad02).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-04-26 Thread Krimit
Github user Krimit commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r113599808
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
@@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
 
   @Since("1.4.1")
   override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
+
+  /**
+   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
+   * like the original, we size it to be 20 times the vocabulary size.
--- End diff --

why 20 times the vocabulary size? Also, this comment is misplaced. It 
belongs with the ``unigramTableSizeFactor``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >