[GitHub] spark pull request #19399: [SPARK-22175][WEB-UI] Add status column to histor...

2017-10-05 Thread caneGuy
Github user caneGuy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19399#discussion_r143114309
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -850,6 +869,18 @@ private[history] class AppListingListener(log: 
FileStatus, clock: Clock) extends
 fileSize)
 }
 
+def applicationStatus : Option[String] = {
+  if (startTime.getTime == -1) {
+Some("")
+  } else if (endTime.getTime == -1) {
+Some("")
--- End diff --

Agree with you @ajbozarth 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19419#discussion_r143113850
  
--- Diff: conf/spark-defaults.conf.template ---
@@ -25,3 +25,10 @@
 # spark.serializer 
org.apache.spark.serializer.KryoSerializer
 # spark.driver.memory  5g
 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three"
+
+#spark.ui.allowFramingFrom https://www.example.com/
+#spark.ui.xXssProtection   1; mode=block
+#spark.ui.xContentType.options nosniff
+
+#Enable below only when Spark is running on HTTPS
+#spark.ui.strictTransportSecurity  max-age=31536000
--- End diff --

Could you add a space after `#` and an empty line at the end of file?
```
# spark.ui.allowFramingFrom https://www.example.com/
# spark.ui.xXssProtection   1; mode=block
# spark.ui.xContentType.options nosniff

# Enable below only when Spark is running on HTTPS
# spark.ui.strictTransportSecurity  max-age=31536000
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19336
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82492/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19336
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19336
  
**[Test build #82492 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82492/testReport)**
 for PR 19336 at commit 
[`d5a9357`](https://github.com/apache/spark/commit/d5a9357162e262ab3e2e0284d95dd9d42297d6df).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143112965
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

```
val elementWiseSum = (u: (BDM[Double], Option[BDV[Double]], Long),
v: (BDM[Double], Option[BDV[Double]], Long)) => {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-05 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/19294
  
@gatorsmile Sounds good, @szhem can we remove the spark sql tests you added 
(due to my request).
Once build passes, I will commit this - it will definitely help spark core 
users.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82493/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82493 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82493/testReport)**
 for PR 18732 at commit 
[`20fb1fe`](https://github.com/apache/spark/commit/20fb1fe9cbf033d73ecf2851f9cb1dc94f41fb3e).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...

2017-10-05 Thread krishna-pandey
Github user krishna-pandey commented on the issue:

https://github.com/apache/spark/pull/19419
  
@dongjoon-hyun Thanks for the review. Made the changes as suggested.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19442: [SPARK-8515][ML][WIP] Improve ML Attribute API

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19442
  
**[Test build #82495 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82495/testReport)**
 for PR 19442 at commit 
[`77ced95`](https://github.com/apache/spark/commit/77ced957e7be2169ac0c59c76f60ab9d4fcac3ef).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19442: [SPARK-8515][ML][WIP] Improve ML Attribute API

2017-10-05 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/19442

[SPARK-8515][ML][WIP] Improve ML Attribute API

## What changes were proposed in this pull request?

The current ML attribute API has issues like inefficiency and not easy to 
use. This work tries to improve this API with main changes:

* Support spark vector-typed attributes.
* Simplify vector-typed attribute serialization. 
* Keep minimum APIs to support ML attributes.

## How was this patch tested?

Added tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-8515

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19442


commit 77d657d8bc8102081e4b0d7b5d42a256e64514d4
Author: Liang-Chi Hsieh 
Date:   2017-10-02T15:03:54Z

Init design of ml attribute.

commit 7837778e7cbbf83851b1a2b5047f4e6a8039f809
Author: Liang-Chi Hsieh 
Date:   2017-10-03T15:03:31Z

revise.

commit 97f6848f0cbb1a76b4434930ce8938da50eaafbe
Author: Liang-Chi Hsieh 
Date:   2017-10-03T15:14:02Z

revise.

commit 2e3a3541fc7a59ac63b2118228de8015c238de40
Author: Liang-Chi Hsieh 
Date:   2017-10-04T05:15:58Z

revise.

commit 0d76eac84f5837aefebc763687fa9c5c7e1aeb4d
Author: Liang-Chi Hsieh 
Date:   2017-10-04T15:07:57Z

revise.

commit 81cca5cccfa2556ff0bba5a73764d3f503040b13
Author: Liang-Chi Hsieh 
Date:   2017-10-05T04:30:48Z

revise.

commit 4813fe8a4bd19a02b7b6bff138f04e7e50f7cdd7
Author: Liang-Chi Hsieh 
Date:   2017-10-05T06:15:53Z

revise.

commit 7951f59027418962ad95465e439bff41876ecfa8
Author: Liang-Chi Hsieh 
Date:   2017-10-05T07:51:50Z

revise.

commit a381af3edf52132086af64360789cb3a7d20d61e
Author: Liang-Chi Hsieh 
Date:   2017-10-05T09:00:02Z

Add builder and test.

commit f25c89dbded0eb9dce25d8da63a1a1aa49ad459f
Author: Liang-Chi Hsieh 
Date:   2017-10-05T15:10:11Z

revise test.

commit 7e237f38088f2375f40f9a4c97aee2e6acd54328
Author: Liang-Chi Hsieh 
Date:   2017-10-06T02:46:07Z

Add new test.

commit 77ced957e7be2169ac0c59c76f60ab9d4fcac3ef
Author: Liang-Chi Hsieh 
Date:   2017-10-06T03:57:12Z

Add more tests.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19294
  
Since this is not related to Spark SQL, please do not add the test cases to 
the Spark SQL side. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-05 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/19294
  
@gatorsmile have your concerns been addressed ? If yes, I will merge this 
into master and 2.2.1

This patch is clearly better than existing state for 2.2 and master - for 
spark core and some of the data sources I tested with.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82491/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82491 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82491/testReport)**
 for PR 19061 at commit 
[`ff4b8e4`](https://github.com/apache/spark/commit/ff4b8e49355e1866a9f0f337cb0c7673e13fdcaf).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19440: [SPARK-21871][SQL] Fix infinite loop when bytecode size ...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19440
  
**[Test build #82494 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82494/testReport)**
 for PR 19440 at commit 
[`b8eb6a0`](https://github.com/apache/spark/commit/b8eb6a0e45ceb9592fbbf32a236aa17cd3e5dac0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82493 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82493/testReport)**
 for PR 18732 at commit 
[`20fb1fe`](https://github.com/apache/spark/commit/20fb1fe9cbf033d73ecf2851f9cb1dc94f41fb3e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19336
  
**[Test build #82492 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82492/testReport)**
 for PR 19336 at commit 
[`d5a9357`](https://github.com/apache/spark/commit/d5a9357162e262ab3e2e0284d95dd9d42297d6df).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19336
  
ping @zsxwing Can you take a look? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82488/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19061
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82488 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82488/testReport)**
 for PR 19061 at commit 
[`6e59cf1`](https://github.com/apache/spark/commit/6e59cf1057a1f9d2584256a3423421c9f6924da1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17357: [SPARK-20025][CORE] Ignore SPARK_LOCAL* env, whil...

2017-10-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17357#discussion_r143096475
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala ---
@@ -23,14 +23,15 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, 
SparkSubmit}
+import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{ChildFirstURLClassLoader, 
MutableURLClassLoader, Utils}
 
 /**
  * Utility object for launching driver programs such that they share fate 
with the Worker process.
  * This is used in standalone cluster mode only.
  */
-object DriverWrapper {
+object DriverWrapper extends Logging {
--- End diff --

This `DriverWrapper` actually runs within the same JVM of driver, and 
initialize log4j instance earlier. Will this be a problem?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19363: [Minor]Override toString of KeyValueGroupedDataset

2017-10-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19363
  
Btw, I think this might need a minor/trivial JIRA ticket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19363: [Minor]Override toString of KeyValueGroupedDataset

2017-10-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19363
  
Although this `toString` in scala-shell looks good, when you print out 
directly, it might look weird because you just see:

```scala
[key: [value: string], value: [value: string]]
```

It's better to add prefix of the class name, like.

```scala
KeyValueGroupedDataset: [key: [value: string], value: [value: string]]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82490/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82490 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82490/testReport)**
 for PR 18732 at commit 
[`d628f4e`](https://github.com/apache/spark/commit/d628f4ede208b2206d6469676fb4e0779dc8f320).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143094785
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
 // Check if compiled code has a too large function
 if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
-  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
-s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+  logInfo(s"Found too long generated codes and JIT optimization might 
not work: " +
+s"the bytecode size ($maxCodeSize) is above the limit " +
 s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disabled " +
 s"for this plan. To avoid this, you can raise the limit " +
-s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
-  return child.execute()
+s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+  child match {
+// For batch file source scan, we should continue executing it
+case f: FileSourceScanExec if f.supportsBatch => // do nothing
--- End diff --

yea, I totally agree that we need to refactor this in future. Anyway, it's 
ok for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19363: [Minor]Override toString of KeyValueGroupedDataset

2017-10-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19363
  
Maybe add a simple test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19363: [Minor]Override toString of KeyValueGroupedDatase...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19363#discussion_r143094589
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -18,16 +18,17 @@
 package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.api.java.function._
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateStruct}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.expressions.ReduceAggregator
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode}
+import org.apache.spark.sql.types.StructType
--- End diff --

Why import `StructType`? I didn't see you use it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19363: [Minor]Override toString of KeyValueGroupedDatase...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19363#discussion_r143094398
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -564,4 +565,30 @@ class KeyValueGroupedDataset[K, V] private[sql](
   encoder: Encoder[R]): Dataset[R] = {
 cogroup(other)((key, left, right) => f.call(key, left.asJava, 
right.asJava).asScala)(encoder)
   }
+
+  override def toString: String = {
+try {
+  val builder = new StringBuilder
+  val kFields = kExprEnc.schema.map {
+case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+  }
+  val vFields = vExprEnc.schema.map {
+case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+  }
+  builder.append("[key: [")
+  builder.append(kFields.take(2).mkString(", "))
+  if (kFields.length > 2) {
+builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+  }
+  builder.append("], value: [")
+  builder.append(vFields.take(2).mkString(", "))
+  if (vFields.length > 2) {
+builder.append(" ... " + (vFields.length - 2) + " more field(s)")
+  }
+  builder.append("]]").toString()
+} catch {
+  case NonFatal(e) =>
--- End diff --

When we will encounter this error?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143094078
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
+case arr: Array[InternalRow] =>
--- End diff --

`mode` is a `BroadcastMode`. So I think we won't hit this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143093875
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
--- End diff --

The estimated size of hashed relation maybe larger than the accurate data 
size calculated before. I guess this may be regression under some cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143093472
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
--- End diff --

This comment is obsolete.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82491 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82491/testReport)**
 for PR 19061 at commit 
[`ff4b8e4`](https://github.com/apache/spark/commit/ff4b8e49355e1866a9f0f337cb0c7673e13fdcaf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19061
  
It's done. Thank you for review, @jiangxb1987 .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143092704
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -203,6 +203,10 @@ package object config {
   private[spark] val HISTORY_UI_MAX_APPS =
 
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
 
+  private[spark] val UI_SHOW_CONSOLE_PROGRESS = 
ConfigBuilder("spark.ui.showConsoleProgress")
--- End diff --

Sure!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19440: [SPARK-21871][SQL] Fix infinite loop when bytecode size ...

2017-10-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19440
  
LGTM except two minor comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143092189
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
 // Check if compiled code has a too large function
 if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
-  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
-s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+  logInfo(s"Found too long generated codes and JIT optimization might 
not work: " +
+s"the bytecode size ($maxCodeSize) is above the limit " +
 s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disabled " +
 s"for this plan. To avoid this, you can raise the limit " +
-s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
-  return child.execute()
+s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+  child match {
+// For batch file source scan, we should continue executing it
--- End diff --

It's better to explain why we should continue it. Otherwise later readers 
may not understand it immediately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143091966
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -185,4 +185,22 @@ class WholeStageCodegenSuite extends SparkPlanTest 
with SharedSQLContext {
 val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
 assert(maxCodeSize2 > 
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
   }
+
+  test("returning batch for wide table") {
+import testImplicits._
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + 
i).as(s"c$i")} : _*)
+  df.write.mode(SaveMode.Overwrite).parquet(path)
+
+  withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202",
+SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "8000") {
+// donot return batch, because whole stage codegen is disabled for 
wide table (>202 columns)
--- End diff --

this is copied and pasted. will fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143091744
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -185,4 +185,22 @@ class WholeStageCodegenSuite extends SparkPlanTest 
with SharedSQLContext {
 val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
 assert(maxCodeSize2 > 
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
   }
+
+  test("returning batch for wide table") {
+import testImplicits._
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + 
i).as(s"c$i")} : _*)
+  df.write.mode(SaveMode.Overwrite).parquet(path)
+
+  withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202",
+SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "8000") {
+// donot return batch, because whole stage codegen is disabled for 
wide table (>202 columns)
--- End diff --

Is this comment wrong or I misunderstand it? Looks like it returns batch as 
it asserts `supportsBatch`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/19394
  
What's the other value?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19363: [Minor]Override toString of KeyValueGroupedDataset

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19363
  
cc @viirya Could you review this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143089714
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
 // Check if compiled code has a too large function
 if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
-  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
-s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+  logInfo(s"Found too long generated codes and JIT optimization might 
not work: " +
+s"the bytecode size ($maxCodeSize) is above the limit " +
 s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disabled " +
 s"for this plan. To avoid this, you can raise the limit " +
-s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
-  return child.execute()
+s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+  child match {
+// For batch file source scan, we should continue executing it
+case f: FileSourceScanExec if f.supportsBatch => // do nothing
--- End diff --

If we do it in `FileSourceScanExec `, we are unable to know which causes 
the fallback. Now, we have at least two reasons that trigger the fallback. 

Ideally, we should not call `WholeStageCodegenExec ` in `doExecute`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143086739
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

ok, I'll try but if this PR is ready to merge then no need to hold up for 
this, we can address later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-05 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
I agree with Bryan. I think we might want to rethink the assumption that
toPandas result with arrow / without arrow should be 100% the same.

For instance, non-Arrow doesn't respect session local timezone, and if for
compatible reasons that we cannot fix this in the non-Arrow version, then
let's fix it in the Arrow version and document the difference. IMHO,
keeping a new feature bug compatible of the existing feature is not
necessarily, fixing non ideal behavior in the new feature it provides a
migration path off the buggy behavior of existing feature.
On Thu, Oct 5, 2017 at 7:34 PM Bryan Cutler 
wrote:

> @ueshin  @HyukjinKwon
>  , I think it would be critical for users
> to have timestamps working for Arrow. Just to recap, the remaining issue
> here was that toPandas() without Arrow does not have timestamps with a
> timezone. Is it possible we can document that difference and not hold this
> up for fixing the case without Arrow? Arrow is still disabled by default,
> so the default behavior of toPandas() does not change.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19440: [SPARK-21871][SQL] Fix infinite loop when bytecode size ...

2017-10-05 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19440
  
Thanks for pining! LGTM except for one comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19440: [SPARK-21871][SQL] Fix infinite loop when bytecod...

2017-10-05 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19440#discussion_r143086096
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
 
 // Check if compiled code has a too large function
 if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
-  logWarning(s"Found too long generated codes and JIT optimization 
might not work: " +
-s"the bytecode size was $maxCodeSize, this value went over the 
limit " +
+  logInfo(s"Found too long generated codes and JIT optimization might 
not work: " +
+s"the bytecode size ($maxCodeSize) is above the limit " +
 s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen 
was disabled " +
 s"for this plan. To avoid this, you can raise the limit " +
-s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
-  return child.execute()
+s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+  child match {
+// For batch file source scan, we should continue executing it
+case f: FileSourceScanExec if f.supportsBatch => // do nothing
--- End diff --

I feel a little weird `WholeStageCodegenExec` has specific error handling 
for each spark plan. Could we handle this error inside `FileSourceScanExec`? 
For example, how about checking if `parent.isInstanceOf[WholeStageCodegenExec]` 
in `FileSourceScanExec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143085886
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -203,6 +203,10 @@ package object config {
   private[spark] val HISTORY_UI_MAX_APPS =
 
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
 
+  private[spark] val UI_SHOW_CONSOLE_PROGRESS = 
ConfigBuilder("spark.ui.showConsoleProgress")
--- End diff --

Please add document to this config using `.doc(...)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143084875
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
+   * Uses Newton-Rhapson method.
* @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet 
Distribution Parameters
*  (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
+   * @param logphat Expectation of estimated log-posterior distribution of
+   *topics in a document averaged over the batch.
+   * @param nonEmptyDocsN number of non-empty documents
*/
-  private def updateAlpha(gammat: BDM[Double]): Unit = {
+  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): 
Unit = {
--- End diff --

The methods will have to cast `nonEmptyDocsN: Int` to `Double`. This way we 
have the conversion implicitly, but the method is private so I don't think it's 
going to hurt.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Here's the error message: TestFailedException: 347.5272 was not greater 
than 1000


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143084656
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

Do you mean the extra spaces after `u` and `v`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143083397
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

Feel free! The tricky bit is that the wrapped function is unknown when 
calling pandas_udf so there needs to be a way to change it later.

Also the function chaining makes it a bit complicated too.

I still think we should do it, but needs to be more careful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143082816
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

I was thinking it should simplify things. Mind if I give it a try? If does 
only complicate then we can forget it..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18664
  
@ueshin @HyukjinKwon , I think it would be critical for users to have 
timestamps working for Arrow. Just to recap, the remaining issue here was that 
`toPandas()` without Arrow does not have timestamps with a timezone. Is it 
possible we can document that difference and not hold this up for fixing the 
case without Arrow?  Arrow is still disabled by default, so the default 
behavior of `toPandas()` does not change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143081782
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

@BryanCutler I got something kind of working here:

https://github.com/icexelloss/spark/pull/5

But I think the change is not trivial and I don't want to complicate  this 
PR. (It is already pretty large).

Should we maybe make the wrapper refactoring it's own PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143081592
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

Do you mean sending all columns?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82490 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82490/testReport)**
 for PR 18732 at commit 
[`d628f4e`](https://github.com/apache/spark/commit/d628f4ede208b2206d6469676fb4e0779dc8f320).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143081342
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
--- End diff --

I don't think that's the case here. But as long as all the cleanup work is 
done, I would not mind it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/19394
  
Not sure - maybe print the chi-value of the test and see if they make 
sense. If they do, we can change the threshold.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080051
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
--- End diff --

Not an style expert myself. Just what I would use:
```
val logphatPartOptionBase = () => {
  if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080481
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

Minor: indent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080675
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
+   * Uses Newton-Rhapson method.
* @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet 
Distribution Parameters
*  (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
+   * @param logphat Expectation of estimated log-posterior distribution of
+   *topics in a document averaged over the batch.
+   * @param nonEmptyDocsN number of non-empty documents
*/
-  private def updateAlpha(gammat: BDM[Double]): Unit = {
+  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): 
Unit = {
--- End diff --

nonEmptyDocsN: Int ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143080889
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

This sends the entire Spark `DataFrame` to Python, is that what users would 
expect for this operation?  Should that be mentioned in the doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82489 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82489/testReport)**
 for PR 18732 at commit 
[`5162ed1`](https://github.com/apache/spark/commit/5162ed1774bd60477f43bfb020047c3bebe5cc48).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82489/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82489 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82489/testReport)**
 for PR 18732 at commit 
[`5162ed1`](https://github.com/apache/spark/commit/5162ed1774bd60477f43bfb020047c3bebe5cc48).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19394
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82486/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19394
  
**[Test build #82486 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82486/testReport)**
 for PR 19394 at commit 
[`3b43e11`](https://github.com/apache/spark/commit/3b43e112d11051c6ce88490530a3dbf4327efad3).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19394
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2017-10-05 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r143078744
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be  1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer. 
The [[transform]] is an
+ * expensive operation, because it uses PIC algorithm to cluster the whole 
input dataset.
+ *
+ * @see http://en.wikipedia.org/wiki/Spectral_clustering>
+ * Spectral clustering (Wikipedia)
+ */
+@Since("2.3.0")
+@Experimental
+class 

[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2017-10-05 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r143078479
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be  1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer. 
The [[transform]] is an
+ * expensive operation, because it uses PIC algorithm to cluster the whole 
input dataset.
+ *
+ * @see http://en.wikipedia.org/wiki/Spectral_clustering>
+ * Spectral clustering (Wikipedia)
+ */
+@Since("2.3.0")
+@Experimental
+class 

[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82487/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #82487 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82487/testReport)**
 for PR 18924 at commit 
[`2942082`](https://github.com/apache/spark/commit/294208217b0cbf0fe745b7a8c603ec2f5675f5dc).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19061
  
**[Test build #82488 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82488/testReport)**
 for PR 19061 at commit 
[`6e59cf1`](https://github.com/apache/spark/commit/6e59cf1057a1f9d2584256a3423421c9f6924da1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19061
  
The PR is updated. Thank you for review, @vanzin !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143077626
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

I see. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143077148
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
+case arr: Array[InternalRow] =>
+  arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+case _ =>
+  numRows * 512 // guess: each row is about 512 bytes
--- End diff --

We did the check in line 99. Thus, this could cause a false alarm. That 
means, this could cause a regression, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143076954
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -434,7 +434,7 @@ class SparkContext(config: SparkConf) extends Logging {
 _statusTracker = new SparkStatusTracker(this)
 
 _progressBar =
-  if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && 
!log.isInfoEnabled) {
+  if (_conf.getBoolean(UI_SHOW_CONSOLE_PROGRESS.key, false) && 
!log.isInfoEnabled) {
--- End diff --

Oh, never mind.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143076726
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,11 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource) && 
!sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS.key)) {
--- End diff --

And, this?
```
if (isShell(args.primaryResource) && 
!sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
...
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143076481
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -434,7 +434,7 @@ class SparkContext(config: SparkConf) extends Logging {
 _statusTracker = new SparkStatusTracker(this)
 
 _progressBar =
-  if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && 
!log.isInfoEnabled) {
+  if (_conf.getBoolean(UI_SHOW_CONSOLE_PROGRESS.key, false) && 
!log.isInfoEnabled) {
--- End diff --

Thanks. Do you mean this?
```
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS)) {
...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143073410
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -203,6 +203,10 @@ package object config {
   private[spark] val HISTORY_UI_MAX_APPS =
 
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
 
+  private[spark] val UI_SHOW_CONSOLE_PROGRESS = 
ConfigBuilder("spark.ui.showConsoleProgress")
+.booleanConf
+.createOptional
--- End diff --

`createWithDefault(false)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143073506
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,11 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource) && 
!sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS.key)) {
--- End diff --

`!sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143073447
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -434,7 +434,7 @@ class SparkContext(config: SparkConf) extends Logging {
 _statusTracker = new SparkStatusTracker(this)
 
 _progressBar =
-  if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && 
!log.isInfoEnabled) {
+  if (_conf.getBoolean(UI_SHOW_CONSOLE_PROGRESS.key, false) && 
!log.isInfoEnabled) {
--- End diff --

`_conf.get(UI_SHOW_CONSOLE_PROGRESS)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143073586
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -399,6 +399,18 @@ class SparkSubmitSuite
 mainClass should be ("org.apache.spark.deploy.yarn.Client")
   }
 
+  test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") {
+val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", 
"spark-shell")
+val appArgs1 = new SparkSubmitArguments(clArgs1)
+val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1)
+sysProps1("spark.ui.showConsoleProgress") should be ("true")
+
+val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
+val appArgs2 = new SparkSubmitArguments(clArgs2)
+val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2)
+sysProps2.keys should not contain "spark.ui.showConsoleProgress"
--- End diff --

`UI_SHOW_CONSOLE_PROGRESS.key`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r143073559
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -399,6 +399,18 @@ class SparkSubmitSuite
 mainClass should be ("org.apache.spark.deploy.yarn.Client")
   }
 
+  test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") {
+val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", 
"spark-shell")
+val appArgs1 = new SparkSubmitArguments(clArgs1)
+val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1)
+sysProps1("spark.ui.showConsoleProgress") should be ("true")
--- End diff --

`UI_SHOW_CONSOLE_PROGRESS.key`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19424#discussion_r143072487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeMap, 
AttributeSet, Expression, ExpressionSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Pushes down various operators to the underlying data source for better 
performance. We classify
+ * operators into different layers, operators in the same layer are 
orderless, i.e. the query result
+ * won't change if we switch the operators within a layer(e.g. we can 
switch the order of predicates
+ * and required columns). The operators in layer N can only be pushed down 
if operators in layer N-1
+ * that above the data source relation are all pushed down. As an example, 
you can't push down limit
+ * if a filter below limit is not pushed down.
+ *
+ * Current operator push down layers:
+ *   layer 1: predicates, required columns.
+ */
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
--- End diff --

Can we add a test suite for the unit test cases of this rule?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143069049
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

About `logphatPartOptionBase`: tried that, initially and failed. This was 
discussed above with @WeichenXu123. The problem is caused by in-place 
modifications. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143068229
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

About the comments. Keep it as you wish.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-10-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #82487 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82487/testReport)**
 for PR 18924 at commit 
[`2942082`](https://github.com/apache/spark/commit/294208217b0cbf0fe745b7a8c603ec2f5675f5dc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143067455
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

Maybe define logphatPartOptionBase as Option but not function.
val logphatPartOptionBase = if (optimizeDocConcentration) {
 Some(BDV.zeros[Double](k))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143066229
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
--- End diff --

But spark scala style guide says :

"... \ return is preferred: Use `return` as a guard to simplify 
control flow without adding a level of indentation".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-10-05 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r143048261
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/Word2VecCBOWSolver.scala ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.mllib.feature
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
+
+object Word2VecCBOWSolver extends Logging {
+  // learning rate is updated for every batch of size batchSize
+  private val batchSize = 1
+
+  // power to raise the unigram distribution with
+  private val power = 0.75
+
+  private val MAX_EXP = 6
+
+  case class Vocabulary(
+totalWordCount: Long,
+vocabMap: Map[String, Int],
+unigramTable: Array[Int],
+samplingTable: Array[Float])
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * We divide input data into N equally sized random partitions.
+   * We then generate initial weights and broadcast them to the N 
partitions. This way
+   * all the partitions start with the same initial weights. We then run N 
independent
+   * estimations that each estimate a model on a partition. The weights 
learned
+   * from each of the N models are averaged and rebroadcast the weights.
+   * This process is repeated `maxIter` number of times.
+   *
+   * @param input A RDD of strings. Each string would be considered a 
sentence.
+   * @return Estimated word2vec model
+   */
+  def fitCBOW[S <: Iterable[String]](
+  word2Vec: Word2Vec,
+  input: RDD[S]): feature.Word2VecModel = {
+
+val negativeSamples = word2Vec.getNegativeSamples
+val sample = word2Vec.getSample
+
+val Vocabulary(totalWordCount, vocabMap, uniTable, sampleTable) =
+  generateVocab(input, word2Vec.getMinCount, sample, 
word2Vec.getUnigramTableSize)
+val vocabSize = vocabMap.size
+
+assert(negativeSamples < vocabSize, s"Vocab size ($vocabSize) cannot 
be smaller" +
+  s" than negative samples($negativeSamples)")
+
+val seed = word2Vec.getSeed
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = word2Vec.getVectorSize
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapBroadcast = sc.broadcast(vocabMap)
+val unigramTableBroadcast = sc.broadcast(uniTable)
+val sampleTableBroadcast = sc.broadcast(sampleTable)
+
+val windowSize = word2Vec.getWindowSize
+val maxSentenceLength = word2Vec.getMaxSentenceLength
+val numPartitions = word2Vec.getNumPartitions
+
+val digitSentences = input.flatMap { sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapBroadcast.value.get)
+  wordIndexes.grouped(maxSentenceLength).map(_.toArray)
+}.repartition(numPartitions).cache()
+
+val learningRate = word2Vec.getStepSize
+
+val wordsPerPartition = totalWordCount / numPartitions
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+val maxIter = word2Vec.getMaxIter
+for {iteration <- 1 to maxIter} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex { case (i_, 

[GitHub] spark pull request #17673: [SPARK-20372] [ML] Word2Vec Continuous Bag of Wor...

2017-10-05 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/17673#discussion_r143063348
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/Word2VecCBOWSolver.scala ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.mllib.feature
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
+
+object Word2VecCBOWSolver extends Logging {
+  // learning rate is updated for every batch of size batchSize
+  private val batchSize = 1
+
+  // power to raise the unigram distribution with
+  private val power = 0.75
+
+  private val MAX_EXP = 6
+
+  case class Vocabulary(
+totalWordCount: Long,
+vocabMap: Map[String, Int],
+unigramTable: Array[Int],
+samplingTable: Array[Float])
+
+  /**
+   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
+   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
+   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
+   * We divide input data into N equally sized random partitions.
+   * We then generate initial weights and broadcast them to the N 
partitions. This way
+   * all the partitions start with the same initial weights. We then run N 
independent
+   * estimations that each estimate a model on a partition. The weights 
learned
+   * from each of the N models are averaged and rebroadcast the weights.
+   * This process is repeated `maxIter` number of times.
+   *
+   * @param input A RDD of strings. Each string would be considered a 
sentence.
+   * @return Estimated word2vec model
+   */
+  def fitCBOW[S <: Iterable[String]](
+  word2Vec: Word2Vec,
+  input: RDD[S]): feature.Word2VecModel = {
+
+val negativeSamples = word2Vec.getNegativeSamples
+val sample = word2Vec.getSample
+
+val Vocabulary(totalWordCount, vocabMap, uniTable, sampleTable) =
+  generateVocab(input, word2Vec.getMinCount, sample, 
word2Vec.getUnigramTableSize)
+val vocabSize = vocabMap.size
+
+assert(negativeSamples < vocabSize, s"Vocab size ($vocabSize) cannot 
be smaller" +
+  s" than negative samples($negativeSamples)")
+
+val seed = word2Vec.getSeed
+val initRandom = new XORShiftRandom(seed)
+
+val vectorSize = word2Vec.getVectorSize
+val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
+val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
+
+val sc = input.context
+
+val vocabMapBroadcast = sc.broadcast(vocabMap)
+val unigramTableBroadcast = sc.broadcast(uniTable)
+val sampleTableBroadcast = sc.broadcast(sampleTable)
+
+val windowSize = word2Vec.getWindowSize
+val maxSentenceLength = word2Vec.getMaxSentenceLength
+val numPartitions = word2Vec.getNumPartitions
+
+val digitSentences = input.flatMap { sentence =>
+  val wordIndexes = sentence.flatMap(vocabMapBroadcast.value.get)
+  wordIndexes.grouped(maxSentenceLength).map(_.toArray)
+}.repartition(numPartitions).cache()
+
+val learningRate = word2Vec.getStepSize
+
+val wordsPerPartition = totalWordCount / numPartitions
+
+logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
+
+val maxIter = word2Vec.getMaxIter
+for {iteration <- 1 to maxIter} {
+  logInfo(s"Starting iteration: $iteration")
+  val iterationStartTime = System.nanoTime()
+
+  val syn0bc = sc.broadcast(syn0Global)
+  val syn1bc = sc.broadcast(syn1Global)
+
+  val partialFits = digitSentences.mapPartitionsWithIndex { case (i_, 

  1   2   3   4   >