spark git commit: [SPARK-11897][SQL] Add @scala.annotations.varargs to sql functions

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4021a28ac -> 12eea834d


[SPARK-11897][SQL] Add @scala.annotations.varargs to sql functions

Author: Xiu Guo 

Closes #9918 from xguo27/SPARK-11897.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12eea834
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12eea834
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12eea834

Branch: refs/heads/master
Commit: 12eea834d7382fbaa9c92182b682b8724049d7c1
Parents: 4021a28
Author: Xiu Guo 
Authored: Tue Nov 24 00:07:40 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 00:07:40 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/12eea834/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index b27b134..6137ce3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -689,6 +689,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
* @since 1.4.0
*/
+  @scala.annotation.varargs
   def array(colName: String, colNames: String*): Column = {
 array((colName +: colNames).map(col) : _*)
   }
@@ -871,6 +872,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
* @since 1.4.0
*/
+  @scala.annotation.varargs
   def struct(colName: String, colNames: String*): Column = {
 struct((colName +: colNames).map(col) : _*)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11897][SQL] Add @scala.annotations.varargs to sql functions

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 21e63929c -> e1b0a2376


[SPARK-11897][SQL] Add @scala.annotations.varargs to sql functions

Author: Xiu Guo 

Closes #9918 from xguo27/SPARK-11897.

(cherry picked from commit 12eea834d7382fbaa9c92182b682b8724049d7c1)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1b0a237
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1b0a237
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1b0a237

Branch: refs/heads/branch-1.6
Commit: e1b0a2376d9a78c14195bf50bd02004554097fba
Parents: 21e6392
Author: Xiu Guo 
Authored: Tue Nov 24 00:07:40 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 00:07:48 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e1b0a237/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index b27b134..6137ce3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -689,6 +689,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
* @since 1.4.0
*/
+  @scala.annotation.varargs
   def array(colName: String, colNames: String*): Column = {
 array((colName +: colNames).map(col) : _*)
   }
@@ -871,6 +872,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
* @since 1.4.0
*/
+  @scala.annotation.varargs
   def struct(colName: String, colNames: String*): Column = {
 struct((colName +: colNames).map(col) : _*)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11906][WEB UI] Speculation Tasks Cause ProgressBar UI Overflow

2015-11-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e1b0a2376 -> 17ea95133


[SPARK-11906][WEB UI] Speculation Tasks Cause ProgressBar UI Overflow

When there are speculative tasks in the stage, running progress bar could 
overflow and goes hidden on a new line:
![image](https://cloud.githubusercontent.com/assets/4317392/11326841/5fd3482e-9142-11e5-8ca5-cb2f0c0c8964.png)
3 completed / 2 running (including 1 speculative) out of 4 total tasks

This is a simple fix by capping the started tasks at `total - completed` tasks
![image](https://cloud.githubusercontent.com/assets/4317392/11326842/6bb67260-9142-11e5-90f0-37f9174878ec.png)

I should note my preferred way to fix it is via css style
```css
.progress { display: flex; }
```
which shifts the correction burden from driver to web browser. However I 
couldn't get selenium test to measure the position/dimension of the progress 
bar correctly to get this unit tested.

It also has the side effect that the width will be calibrated so the running 
occupies 2 / 5 instead of 1 / 4.
![image](https://cloud.githubusercontent.com/assets/4317392/11326848/7b03e9f0-9142-11e5-89ad-bd99cb0647cf.png)

All in all, since this cosmetic bug is minor enough, I suppose the original 
simple fix should be good enough.

Author: Forest Fang 

Closes #9896 from saurfang/progressbar.

(cherry picked from commit 800bd799acf7f10a469d8d6537279953129eb2c6)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17ea9513
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17ea9513
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17ea9513

Branch: refs/heads/branch-1.6
Commit: 17ea9513313187cfb6e7c7e66e6e469c5cef63cf
Parents: e1b0a23
Author: Forest Fang 
Authored: Tue Nov 24 09:03:32 2015 +
Committer: Sean Owen 
Committed: Tue Nov 24 09:03:44 2015 +

--
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala |  4 +++-
 .../src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala | 10 ++
 2 files changed, 13 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/17ea9513/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 25dcb60..84a1116 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -319,7 +319,9 @@ private[spark] object UIUtils extends Logging {
   skipped: Int,
   total: Int): Seq[Node] = {
 val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
-val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+// started + completed can be > total when there are speculative tasks
+val boundedStarted = math.min(started, total - completed)
+val startWidth = "width: %s%%".format((boundedStarted.toDouble/total)*100)
 
 
   

http://git-wip-us.apache.org/repos/asf/spark/blob/17ea9513/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
index 2b693c1..dd8d5ec 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -57,6 +57,16 @@ class UIUtilsSuite extends SparkFunSuite {
 )
   }
 
+  test("SPARK-11906: Progress bar should not overflow because of speculative 
tasks") {
+val generated = makeProgressBar(2, 3, 0, 0, 4).head.child.filter(_.label 
== "div")
+val expected = Seq(
+  ,
+  
+)
+assert(generated.sameElements(expected),
+  s"\nRunning progress bar should round 
down\n\nExpected:\n$expected\nGenerated:\n$generated")
+  }
+
   private def verify(
   desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = 
""): Unit = {
 val generated = makeDescription(desc, baseUrl)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11906][WEB UI] Speculation Tasks Cause ProgressBar UI Overflow

2015-11-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 12eea834d -> 800bd799a


[SPARK-11906][WEB UI] Speculation Tasks Cause ProgressBar UI Overflow

When there are speculative tasks in the stage, running progress bar could 
overflow and goes hidden on a new line:
![image](https://cloud.githubusercontent.com/assets/4317392/11326841/5fd3482e-9142-11e5-8ca5-cb2f0c0c8964.png)
3 completed / 2 running (including 1 speculative) out of 4 total tasks

This is a simple fix by capping the started tasks at `total - completed` tasks
![image](https://cloud.githubusercontent.com/assets/4317392/11326842/6bb67260-9142-11e5-90f0-37f9174878ec.png)

I should note my preferred way to fix it is via css style
```css
.progress { display: flex; }
```
which shifts the correction burden from driver to web browser. However I 
couldn't get selenium test to measure the position/dimension of the progress 
bar correctly to get this unit tested.

It also has the side effect that the width will be calibrated so the running 
occupies 2 / 5 instead of 1 / 4.
![image](https://cloud.githubusercontent.com/assets/4317392/11326848/7b03e9f0-9142-11e5-89ad-bd99cb0647cf.png)

All in all, since this cosmetic bug is minor enough, I suppose the original 
simple fix should be good enough.

Author: Forest Fang 

Closes #9896 from saurfang/progressbar.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/800bd799
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/800bd799
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/800bd799

Branch: refs/heads/master
Commit: 800bd799acf7f10a469d8d6537279953129eb2c6
Parents: 12eea83
Author: Forest Fang 
Authored: Tue Nov 24 09:03:32 2015 +
Committer: Sean Owen 
Committed: Tue Nov 24 09:03:32 2015 +

--
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala |  4 +++-
 .../src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala | 10 ++
 2 files changed, 13 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/800bd799/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 25dcb60..84a1116 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -319,7 +319,9 @@ private[spark] object UIUtils extends Logging {
   skipped: Int,
   total: Int): Seq[Node] = {
 val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
-val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+// started + completed can be > total when there are speculative tasks
+val boundedStarted = math.min(started, total - completed)
+val startWidth = "width: %s%%".format((boundedStarted.toDouble/total)*100)
 
 
   

http://git-wip-us.apache.org/repos/asf/spark/blob/800bd799/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
index 2b693c1..dd8d5ec 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -57,6 +57,16 @@ class UIUtilsSuite extends SparkFunSuite {
 )
   }
 
+  test("SPARK-11906: Progress bar should not overflow because of speculative 
tasks") {
+val generated = makeProgressBar(2, 3, 0, 0, 4).head.child.filter(_.label 
== "div")
+val expected = Seq(
+  ,
+  
+)
+assert(generated.sameElements(expected),
+  s"\nRunning progress bar should round 
down\n\nExpected:\n$expected\nGenerated:\n$generated")
+  }
+
   private def verify(
   desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = 
""): Unit = {
 val generated = makeDescription(desc, baseUrl)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.

2015-11-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 800bd799a -> d4a5e6f71


[SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.

`SessionManager` will set the `operationLog` if the configuration 
`hive.server2.logging.operation.enabled` is true in version of hive 1.2.1.
But the spark did not adapt to this change, so no matter enabled the 
configuration or not, spark thrift server will always log the warn message.
PS: if `hive.server2.logging.operation.enabled` is false, it should log the 
warn message (the same as hive thrift server).

Author: huangzhaowei 

Closes #9056 from SaintBacchus/SPARK-11043.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4a5e6f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4a5e6f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4a5e6f7

Branch: refs/heads/master
Commit: d4a5e6f719079639ffd38470f4d8d1f6fde3228d
Parents: 800bd79
Author: huangzhaowei 
Authored: Tue Nov 24 23:24:49 2015 +0800
Committer: Cheng Lian 
Committed: Tue Nov 24 23:24:49 2015 +0800

--
 .../SparkExecuteStatementOperation.scala|  8 
 .../hive/thriftserver/SparkSQLSessionManager.scala  |  5 +
 .../hive/thriftserver/HiveThriftServer2Suites.scala | 16 +++-
 3 files changed, 24 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d4a5e6f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 82fef92..e022ee8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   def getResultSetSchema: TableSchema = resultSchema
 
-  override def run(): Unit = {
+  override def runInternal(): Unit = {
 setState(OperationState.PENDING)
 setHasResultSet(true) // avoid no resultset for async run
 
 if (!runInBackground) {
-  runInternal()
+  execute()
 } else {
   val sparkServiceUGI = Utils.getUGI()
 
@@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation(
   val doAsAction = new PrivilegedExceptionAction[Unit]() {
 override def run(): Unit = {
   try {
-runInternal()
+execute()
   } catch {
 case e: HiveSQLException =>
   setOperationException(e)
@@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation(
 }
   }
 
-  override def runInternal(): Unit = {
+  private def execute(): Unit = {
 statementId = UUID.randomUUID().toString
 logInfo(s"Running query '$statement' with $statementId")
 setState(OperationState.RUNNING)

http://git-wip-us.apache.org/repos/asf/spark/blob/d4a5e6f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index af4fcdf..de4e9c6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: 
HiveServer2, hiveContext:
   override def init(hiveConf: HiveConf) {
 setSuperField(this, "hiveConf", hiveConf)
 
+// Create operation log root directory, if operation logging is enabled
+if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+  invoke(classOf[SessionManager], this, "initOperationLogRootDir")
+}
+
 val backgroundPoolSize = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
 setSuperField(this, "backgroundOperationPool", 
Executors.newFixedThreadPool(backgroundPoolSize))
 getAncestorField[Log](this, 3, "LOG").info(

http://git-wip-us.apache.org/repos/asf/spark/blob/d4a5e6f7/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

spark git commit: [SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.

2015-11-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 17ea95133 -> f1f2cee4c


[SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.

`SessionManager` will set the `operationLog` if the configuration 
`hive.server2.logging.operation.enabled` is true in version of hive 1.2.1.
But the spark did not adapt to this change, so no matter enabled the 
configuration or not, spark thrift server will always log the warn message.
PS: if `hive.server2.logging.operation.enabled` is false, it should log the 
warn message (the same as hive thrift server).

Author: huangzhaowei 

Closes #9056 from SaintBacchus/SPARK-11043.

(cherry picked from commit d4a5e6f719079639ffd38470f4d8d1f6fde3228d)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1f2cee4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1f2cee4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1f2cee4

Branch: refs/heads/branch-1.6
Commit: f1f2cee4cebd624f86a3c59297aeb77e5a839e2a
Parents: 17ea951
Author: huangzhaowei 
Authored: Tue Nov 24 23:24:49 2015 +0800
Committer: Cheng Lian 
Committed: Tue Nov 24 23:25:14 2015 +0800

--
 .../SparkExecuteStatementOperation.scala|  8 
 .../hive/thriftserver/SparkSQLSessionManager.scala  |  5 +
 .../hive/thriftserver/HiveThriftServer2Suites.scala | 16 +++-
 3 files changed, 24 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 82fef92..e022ee8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   def getResultSetSchema: TableSchema = resultSchema
 
-  override def run(): Unit = {
+  override def runInternal(): Unit = {
 setState(OperationState.PENDING)
 setHasResultSet(true) // avoid no resultset for async run
 
 if (!runInBackground) {
-  runInternal()
+  execute()
 } else {
   val sparkServiceUGI = Utils.getUGI()
 
@@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation(
   val doAsAction = new PrivilegedExceptionAction[Unit]() {
 override def run(): Unit = {
   try {
-runInternal()
+execute()
   } catch {
 case e: HiveSQLException =>
   setOperationException(e)
@@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation(
 }
   }
 
-  override def runInternal(): Unit = {
+  private def execute(): Unit = {
 statementId = UUID.randomUUID().toString
 logInfo(s"Running query '$statement' with $statementId")
 setState(OperationState.RUNNING)

http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index af4fcdf..de4e9c6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: 
HiveServer2, hiveContext:
   override def init(hiveConf: HiveConf) {
 setSuperField(this, "hiveConf", hiveConf)
 
+// Create operation log root directory, if operation logging is enabled
+if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+  invoke(classOf[SessionManager], this, "initOperationLogRootDir")
+}
+
 val backgroundPoolSize = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
 setSuperField(this, "backgroundOperationPool", 
Executors.newFixedThreadPool(backgroundPoolSize))
 getAncestorField[Log](this, 3, "LOG").info(

http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/test/scala/org/apache/spark

spark git commit: [SPARK-11592][SQL] flush spark-sql command line history to history file

2015-11-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master d4a5e6f71 -> 5889880fb


[SPARK-11592][SQL] flush spark-sql command line history to history file

Currently, `spark-sql` would not flush command history when exiting.

Author: Daoyuan Wang 

Closes #9563 from adrian-wang/jline.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5889880f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5889880f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5889880f

Branch: refs/heads/master
Commit: 5889880fbe9628681042036892ef7ebd4f0857b4
Parents: d4a5e6f
Author: Daoyuan Wang 
Authored: Tue Nov 24 23:32:05 2015 +0800
Committer: Cheng Lian 
Committed: Tue Nov 24 23:32:05 2015 +0800

--
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala   | 16 
 1 file changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5889880f/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 6419002..4b928e6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -194,6 +194,22 @@ private[hive] object SparkSQLCLIDriver extends Logging {
 logWarning(e.getMessage)
 }
 
+// add shutdown hook to flush the history to history file
+Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() {
+  override def run() = {
+reader.getHistory match {
+  case h: FileHistory =>
+try {
+  h.flush()
+} catch {
+  case e: IOException =>
+logWarning("WARNING: Failed to write command history file: " + 
e.getMessage)
+}
+  case _ =>
+}
+  }
+}))
+
 // TODO: missing
 /*
 val clientTransportTSocketField = 
classOf[CliSessionState].getDeclaredField("transport")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

2015-11-24 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 5889880fb -> be9dd1550


[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

…parent class loader

Without patch, two additional tests of ExecutorClassLoaderSuite fails.

- "resource from parent"
- "resources from parent"

Detailed explanation is here, 
https://issues.apache.org/jira/browse/SPARK-11818?focusedCommentId=15011202&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15011202

Author: Jungtaek Lim 

Closes #9812 from HeartSaVioR/SPARK-11818.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be9dd155
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be9dd155
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be9dd155

Branch: refs/heads/master
Commit: be9dd1550c1816559d3d418a19c692e715f1c94e
Parents: 5889880
Author: Jungtaek Lim 
Authored: Tue Nov 24 09:20:09 2015 -0800
Committer: Marcelo Vanzin 
Committed: Tue Nov 24 09:20:09 2015 -0800

--
 .../apache/spark/repl/ExecutorClassLoader.scala | 12 +++-
 .../spark/repl/ExecutorClassLoaderSuite.scala   | 29 
 2 files changed, 40 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be9dd155/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
--
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index a976e96..a8859fc 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -34,7 +34,9 @@ import org.apache.spark.util.ParentClassLoader
 /**
  * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
  * used to load classes defined by the interpreter when the REPL is used.
- * Allows the user to specify if user class path should be first
+ * Allows the user to specify if user class path should be first.
+ * This class loader delegates getting/finding resources to parent loader,
+ * which makes sense until REPL never provide resource dynamically.
  */
 class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
 userClassPathFirst: Boolean) extends ClassLoader with Logging {
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, 
parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
+  }
+
+  override def getResources(name: String): java.util.Enumeration[URL] = {
+parentLoader.getResources(name)
+  }
+
   override def findClass(name: String): Class[_] = {
 userClassPathFirst match {
   case true => 
findClassLocally(name).getOrElse(parentLoader.loadClass(name))

http://git-wip-us.apache.org/repos/asf/spark/blob/be9dd155/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
--
diff --git 
a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index a58eda1..c1211f7 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -19,8 +19,13 @@ package org.apache.spark.repl
 
 import java.io.File
 import java.net.{URL, URLClassLoader}
+import java.nio.charset.StandardCharsets
+import java.util
+
+import com.google.common.io.Files
 
 import scala.concurrent.duration._
+import scala.io.Source
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
@@ -41,6 +46,7 @@ class ExecutorClassLoaderSuite
 
   val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
   val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", 
"ReplFakeClass3")
+  val parentResourceNames = List("fake-resource.txt")
   var tempDir1: File = _
   var tempDir2: File = _
   var url1: String = _
@@ -54,6 +60,9 @@ class ExecutorClassLoaderSuite
 url1 = "file://" + tempDir1
 urls2 = List(tempDir2.toURI.toURL).toArray
 childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
+parentResourceNames.foreach { x =>
+  Files.write("resource".getBytes(StandardCharsets.UTF_8), new 
File(tempDir2, x))
+}
 parentClassNames.foreach(TestUtils.createCompiledClass(_, tempDir2, "2"))
   }
 
@@ -99,6 +108,26 @@ class ExecutorClassLoaderSuite
 }
   }
 
+  test("resource from parent") {
+val parentLoader = new URLClassLoader(urls2, null)
+val classLoader = new ExecutorClassLoader(new SparkConf(), url1, 
parentLoader, true)
+val resourceName

spark git commit: [SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

2015-11-24 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f1f2cee4c -> 895128505


[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

…parent class loader

Without patch, two additional tests of ExecutorClassLoaderSuite fails.

- "resource from parent"
- "resources from parent"

Detailed explanation is here, 
https://issues.apache.org/jira/browse/SPARK-11818?focusedCommentId=15011202&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15011202

Author: Jungtaek Lim 

Closes #9812 from HeartSaVioR/SPARK-11818.

(cherry picked from commit be9dd1550c1816559d3d418a19c692e715f1c94e)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89512850
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89512850
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89512850

Branch: refs/heads/branch-1.6
Commit: 8951285054c4a4496b1c736fe225491d6d8d3426
Parents: f1f2cee
Author: Jungtaek Lim 
Authored: Tue Nov 24 09:20:09 2015 -0800
Committer: Marcelo Vanzin 
Committed: Tue Nov 24 09:20:24 2015 -0800

--
 .../apache/spark/repl/ExecutorClassLoader.scala | 12 +++-
 .../spark/repl/ExecutorClassLoaderSuite.scala   | 29 
 2 files changed, 40 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/89512850/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
--
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index a976e96..a8859fc 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -34,7 +34,9 @@ import org.apache.spark.util.ParentClassLoader
 /**
  * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
  * used to load classes defined by the interpreter when the REPL is used.
- * Allows the user to specify if user class path should be first
+ * Allows the user to specify if user class path should be first.
+ * This class loader delegates getting/finding resources to parent loader,
+ * which makes sense until REPL never provide resource dynamically.
  */
 class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
 userClassPathFirst: Boolean) extends ClassLoader with Logging {
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, 
parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
+  }
+
+  override def getResources(name: String): java.util.Enumeration[URL] = {
+parentLoader.getResources(name)
+  }
+
   override def findClass(name: String): Class[_] = {
 userClassPathFirst match {
   case true => 
findClassLocally(name).getOrElse(parentLoader.loadClass(name))

http://git-wip-us.apache.org/repos/asf/spark/blob/89512850/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
--
diff --git 
a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index a58eda1..c1211f7 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -19,8 +19,13 @@ package org.apache.spark.repl
 
 import java.io.File
 import java.net.{URL, URLClassLoader}
+import java.nio.charset.StandardCharsets
+import java.util
+
+import com.google.common.io.Files
 
 import scala.concurrent.duration._
+import scala.io.Source
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
@@ -41,6 +46,7 @@ class ExecutorClassLoaderSuite
 
   val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
   val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", 
"ReplFakeClass3")
+  val parentResourceNames = List("fake-resource.txt")
   var tempDir1: File = _
   var tempDir2: File = _
   var url1: String = _
@@ -54,6 +60,9 @@ class ExecutorClassLoaderSuite
 url1 = "file://" + tempDir1
 urls2 = List(tempDir2.toURI.toURL).toArray
 childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
+parentResourceNames.foreach { x =>
+  Files.write("resource".getBytes(StandardCharsets.UTF_8), new 
File(tempDir2, x))
+}
 parentClassNames.foreach(TestUtils.createCompiledClass(_, tempDir2, "2"))
   }
 
@@ -99,6 +108,26 @@ class ExecutorClassLoaderSuite
 }
   }
 
+  test("resource from parent") {
+val parentLoader = new URLClassLoader(urls2, null)
+  

spark git commit: [SPARK-11942][SQL] fix encoder life cycle for CoGroup

2015-11-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 895128505 -> 3cb1b6d39


[SPARK-11942][SQL] fix encoder life cycle for CoGroup

we should pass in resolved encodera to logical `CoGroup` and bind them in 
physical `CoGroup`

Author: Wenchen Fan 

Closes #9928 from cloud-fan/cogroup.

(cherry picked from commit e5aaae6e1145b8c25c4872b2992ab425da9c6f9b)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb1b6d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb1b6d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb1b6d3

Branch: refs/heads/branch-1.6
Commit: 3cb1b6d398a8066a6c9d800a16b8cd21f3fde50f
Parents: 8951285
Author: Wenchen Fan 
Authored: Tue Nov 24 09:28:39 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 24 09:28:51 2015 -0800

--
 .../catalyst/plans/logical/basicOperators.scala | 27 +++-
 .../org/apache/spark/sql/GroupedDataset.scala   |  4 ++-
 .../spark/sql/execution/basicOperators.scala| 20 ---
 .../org/apache/spark/sql/DatasetSuite.scala | 12 +
 4 files changed, 41 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b6d3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 737e62f..5665fd7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -553,19 +553,22 @@ case class MapGroups[K, T, U](
 
 /** Factory for constructing new `CoGroup` nodes. */
 object CoGroup {
-  def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder](
-  func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
+  def apply[Key, Left, Right, Result : Encoder](
+  func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+  keyEnc: ExpressionEncoder[Key],
+  leftEnc: ExpressionEncoder[Left],
+  rightEnc: ExpressionEncoder[Right],
   leftGroup: Seq[Attribute],
   rightGroup: Seq[Attribute],
   left: LogicalPlan,
-  right: LogicalPlan): CoGroup[K, Left, Right, R] = {
+  right: LogicalPlan): CoGroup[Key, Left, Right, Result] = {
 CoGroup(
   func,
-  encoderFor[K],
-  encoderFor[Left],
-  encoderFor[Right],
-  encoderFor[R],
-  encoderFor[R].schema.toAttributes,
+  keyEnc,
+  leftEnc,
+  rightEnc,
+  encoderFor[Result],
+  encoderFor[Result].schema.toAttributes,
   leftGroup,
   rightGroup,
   left,
@@ -577,12 +580,12 @@ object CoGroup {
  * A relation produced by applying `func` to each grouping key and associated 
values from left and
  * right children.
  */
-case class CoGroup[K, Left, Right, R](
-func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
-kEncoder: ExpressionEncoder[K],
+case class CoGroup[Key, Left, Right, Result](
+func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+keyEnc: ExpressionEncoder[Key],
 leftEnc: ExpressionEncoder[Left],
 rightEnc: ExpressionEncoder[Right],
-rEncoder: ExpressionEncoder[R],
+resultEnc: ExpressionEncoder[Result],
 output: Seq[Attribute],
 leftGroup: Seq[Attribute],
 rightGroup: Seq[Attribute],

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b6d3/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index 793a86b..a10a893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -304,11 +304,13 @@ class GroupedDataset[K, V] private[sql](
   def cogroup[U, R : Encoder](
   other: GroupedDataset[K, U])(
   f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
-implicit def uEnc: Encoder[U] = other.unresolvedVEncoder
 new Dataset[R](
   sqlContext,
   CoGroup(
 f,
+this.resolvedKEncoder,
+this.resolvedVEncoder,
+other.resolvedVEncoder,
 this.groupingAttributes,
 other.groupingAttributes,
 this.logicalPlan,

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b6d3/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperato

spark git commit: [SPARK-11942][SQL] fix encoder life cycle for CoGroup

2015-11-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master be9dd1550 -> e5aaae6e1


[SPARK-11942][SQL] fix encoder life cycle for CoGroup

we should pass in resolved encodera to logical `CoGroup` and bind them in 
physical `CoGroup`

Author: Wenchen Fan 

Closes #9928 from cloud-fan/cogroup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5aaae6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5aaae6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5aaae6e

Branch: refs/heads/master
Commit: e5aaae6e1145b8c25c4872b2992ab425da9c6f9b
Parents: be9dd15
Author: Wenchen Fan 
Authored: Tue Nov 24 09:28:39 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 24 09:28:39 2015 -0800

--
 .../catalyst/plans/logical/basicOperators.scala | 27 +++-
 .../org/apache/spark/sql/GroupedDataset.scala   |  4 ++-
 .../spark/sql/execution/basicOperators.scala| 20 ---
 .../org/apache/spark/sql/DatasetSuite.scala | 12 +
 4 files changed, 41 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e5aaae6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 737e62f..5665fd7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -553,19 +553,22 @@ case class MapGroups[K, T, U](
 
 /** Factory for constructing new `CoGroup` nodes. */
 object CoGroup {
-  def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder](
-  func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
+  def apply[Key, Left, Right, Result : Encoder](
+  func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+  keyEnc: ExpressionEncoder[Key],
+  leftEnc: ExpressionEncoder[Left],
+  rightEnc: ExpressionEncoder[Right],
   leftGroup: Seq[Attribute],
   rightGroup: Seq[Attribute],
   left: LogicalPlan,
-  right: LogicalPlan): CoGroup[K, Left, Right, R] = {
+  right: LogicalPlan): CoGroup[Key, Left, Right, Result] = {
 CoGroup(
   func,
-  encoderFor[K],
-  encoderFor[Left],
-  encoderFor[Right],
-  encoderFor[R],
-  encoderFor[R].schema.toAttributes,
+  keyEnc,
+  leftEnc,
+  rightEnc,
+  encoderFor[Result],
+  encoderFor[Result].schema.toAttributes,
   leftGroup,
   rightGroup,
   left,
@@ -577,12 +580,12 @@ object CoGroup {
  * A relation produced by applying `func` to each grouping key and associated 
values from left and
  * right children.
  */
-case class CoGroup[K, Left, Right, R](
-func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
-kEncoder: ExpressionEncoder[K],
+case class CoGroup[Key, Left, Right, Result](
+func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+keyEnc: ExpressionEncoder[Key],
 leftEnc: ExpressionEncoder[Left],
 rightEnc: ExpressionEncoder[Right],
-rEncoder: ExpressionEncoder[R],
+resultEnc: ExpressionEncoder[Result],
 output: Seq[Attribute],
 leftGroup: Seq[Attribute],
 rightGroup: Seq[Attribute],

http://git-wip-us.apache.org/repos/asf/spark/blob/e5aaae6e/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index 793a86b..a10a893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -304,11 +304,13 @@ class GroupedDataset[K, V] private[sql](
   def cogroup[U, R : Encoder](
   other: GroupedDataset[K, U])(
   f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
-implicit def uEnc: Encoder[U] = other.unresolvedVEncoder
 new Dataset[R](
   sqlContext,
   CoGroup(
 f,
+this.resolvedKEncoder,
+this.resolvedVEncoder,
+other.resolvedVEncoder,
 this.groupingAttributes,
 other.groupingAttributes,
 this.logicalPlan,

http://git-wip-us.apache.org/repos/asf/spark/blob/e5aaae6e/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
--
diff --git 
a/sql/core/src/main

spark git commit: [SPARK-11952][ML] Remove duplicate ml examples

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master e5aaae6e1 -> 56a0aba0a


[SPARK-11952][ML] Remove duplicate ml examples

Remove duplicate ml examples (only for ml).  mengxr

Author: Yanbo Liang 

Closes #9933 from yanboliang/SPARK-11685.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56a0aba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56a0aba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56a0aba0

Branch: refs/heads/master
Commit: 56a0aba0a60326ba026056c9a23f3f6ec7258c19
Parents: e5aaae6
Author: Yanbo Liang 
Authored: Tue Nov 24 09:52:53 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:52:53 2015 -0800

--
 .../main/python/ml/gradient_boosted_trees.py| 82 --
 .../src/main/python/ml/logistic_regression.py   | 66 ---
 .../src/main/python/ml/random_forest_example.py | 87 
 3 files changed, 235 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/56a0aba0/examples/src/main/python/ml/gradient_boosted_trees.py
--
diff --git a/examples/src/main/python/ml/gradient_boosted_trees.py 
b/examples/src/main/python/ml/gradient_boosted_trees.py
deleted file mode 100644
index c3bf8aa..000
--- a/examples/src/main/python/ml/gradient_boosted_trees.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.ml.classification import GBTClassifier
-from pyspark.ml.feature import StringIndexer
-from pyspark.ml.regression import GBTRegressor
-from pyspark.mllib.evaluation import BinaryClassificationMetrics, 
RegressionMetrics
-from pyspark.sql import Row, SQLContext
-
-"""
-A simple example demonstrating a Gradient Boosted Trees 
Classification/Regression Pipeline.
-Note: GBTClassifier only supports binary classification currently
-Run with:
-  bin/spark-submit examples/src/main/python/ml/gradient_boosted_trees.py
-"""
-
-
-def testClassification(train, test):
-# Train a GradientBoostedTrees model.
-
-rf = GBTClassifier(maxIter=30, maxDepth=4, labelCol="indexedLabel")
-
-model = rf.fit(train)
-predictionAndLabels = model.transform(test).select("prediction", 
"indexedLabel") \
-.map(lambda x: (x.prediction, x.indexedLabel))
-
-metrics = BinaryClassificationMetrics(predictionAndLabels)
-print("AUC %.3f" % metrics.areaUnderROC)
-
-
-def testRegression(train, test):
-# Train a GradientBoostedTrees model.
-
-rf = GBTRegressor(maxIter=30, maxDepth=4, labelCol="indexedLabel")
-
-model = rf.fit(train)
-predictionAndLabels = model.transform(test).select("prediction", 
"indexedLabel") \
-.map(lambda x: (x.prediction, x.indexedLabel))
-
-metrics = RegressionMetrics(predictionAndLabels)
-print("rmse %.3f" % metrics.rootMeanSquaredError)
-print("r2 %.3f" % metrics.r2)
-print("mae %.3f" % metrics.meanAbsoluteError)
-
-
-if __name__ == "__main__":
-if len(sys.argv) > 1:
-print("Usage: gradient_boosted_trees", file=sys.stderr)
-exit(1)
-sc = SparkContext(appName="PythonGBTExample")
-sqlContext = SQLContext(sc)
-
-# Load the data stored in LIBSVM format as a DataFrame.
-df = 
sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
-
-# Map labels into an indexed column of labels in [0, numLabels)
-stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
-si_model = stringIndexer.fit(df)
-td = si_model.transform(df)
-[train, test] = td.randomSplit([0.7, 0.3])
-testClassification(train, test)
-testRegression(train, test)
-sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/56a0aba0/examples/src/main/python/ml/logistic_regression.py
--
diff --git a/examples/src/main/python/ml/logistic_regression.py 
b/examples/src/main/python/ml/logistic_regression.py
de

spark git commit: [SPARK-11952][ML] Remove duplicate ml examples

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3cb1b6d39 -> 6914b7504


[SPARK-11952][ML] Remove duplicate ml examples

Remove duplicate ml examples (only for ml).  mengxr

Author: Yanbo Liang 

Closes #9933 from yanboliang/SPARK-11685.

(cherry picked from commit 56a0aba0a60326ba026056c9a23f3f6ec7258c19)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6914b750
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6914b750
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6914b750

Branch: refs/heads/branch-1.6
Commit: 6914b75046dceda47ba3ff904e67f55752e8d49d
Parents: 3cb1b6d
Author: Yanbo Liang 
Authored: Tue Nov 24 09:52:53 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:53:01 2015 -0800

--
 .../main/python/ml/gradient_boosted_trees.py| 82 --
 .../src/main/python/ml/logistic_regression.py   | 66 ---
 .../src/main/python/ml/random_forest_example.py | 87 
 3 files changed, 235 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6914b750/examples/src/main/python/ml/gradient_boosted_trees.py
--
diff --git a/examples/src/main/python/ml/gradient_boosted_trees.py 
b/examples/src/main/python/ml/gradient_boosted_trees.py
deleted file mode 100644
index c3bf8aa..000
--- a/examples/src/main/python/ml/gradient_boosted_trees.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.ml.classification import GBTClassifier
-from pyspark.ml.feature import StringIndexer
-from pyspark.ml.regression import GBTRegressor
-from pyspark.mllib.evaluation import BinaryClassificationMetrics, 
RegressionMetrics
-from pyspark.sql import Row, SQLContext
-
-"""
-A simple example demonstrating a Gradient Boosted Trees 
Classification/Regression Pipeline.
-Note: GBTClassifier only supports binary classification currently
-Run with:
-  bin/spark-submit examples/src/main/python/ml/gradient_boosted_trees.py
-"""
-
-
-def testClassification(train, test):
-# Train a GradientBoostedTrees model.
-
-rf = GBTClassifier(maxIter=30, maxDepth=4, labelCol="indexedLabel")
-
-model = rf.fit(train)
-predictionAndLabels = model.transform(test).select("prediction", 
"indexedLabel") \
-.map(lambda x: (x.prediction, x.indexedLabel))
-
-metrics = BinaryClassificationMetrics(predictionAndLabels)
-print("AUC %.3f" % metrics.areaUnderROC)
-
-
-def testRegression(train, test):
-# Train a GradientBoostedTrees model.
-
-rf = GBTRegressor(maxIter=30, maxDepth=4, labelCol="indexedLabel")
-
-model = rf.fit(train)
-predictionAndLabels = model.transform(test).select("prediction", 
"indexedLabel") \
-.map(lambda x: (x.prediction, x.indexedLabel))
-
-metrics = RegressionMetrics(predictionAndLabels)
-print("rmse %.3f" % metrics.rootMeanSquaredError)
-print("r2 %.3f" % metrics.r2)
-print("mae %.3f" % metrics.meanAbsoluteError)
-
-
-if __name__ == "__main__":
-if len(sys.argv) > 1:
-print("Usage: gradient_boosted_trees", file=sys.stderr)
-exit(1)
-sc = SparkContext(appName="PythonGBTExample")
-sqlContext = SQLContext(sc)
-
-# Load the data stored in LIBSVM format as a DataFrame.
-df = 
sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
-
-# Map labels into an indexed column of labels in [0, numLabels)
-stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
-si_model = stringIndexer.fit(df)
-td = si_model.transform(df)
-[train, test] = td.randomSplit([0.7, 0.3])
-testClassification(train, test)
-testRegression(train, test)
-sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/6914b750/examples/src/main/python/ml/logistic_regression.py
--
diff --git a

spark git commit: [SPARK-11521][ML][DOC] Document that Logistic, Linear Regression summaries ignore weight col

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 56a0aba0a -> 9e24ba667


[SPARK-11521][ML][DOC] Document that Logistic, Linear Regression summaries 
ignore weight col

Doc for 1.6 that the summaries mostly ignore the weight column.
To be corrected for 1.7

CC: mengxr thunterdb

Author: Joseph K. Bradley 

Closes #9927 from jkbradley/linregsummary-doc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e24ba66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e24ba66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e24ba66

Branch: refs/heads/master
Commit: 9e24ba667e43290fbaa3cacb93cf5d9be790f1fd
Parents: 56a0aba
Author: Joseph K. Bradley 
Authored: Tue Nov 24 09:54:55 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:54:55 2015 -0800

--
 .../ml/classification/LogisticRegression.scala| 18 ++
 .../spark/ml/regression/LinearRegression.scala| 15 +++
 2 files changed, 33 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e24ba66/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 418bbdc..d320d64 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -755,23 +755,35 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns the receiver operating characteristic (ROC) curve,
* which is an Dataframe having two fields (FPR, TPR)
* with (0.0, 0.0) prepended and (1.0, 1.0) appended to it.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
* @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic
*/
   @transient lazy val roc: DataFrame = binaryMetrics.roc().toDF("FPR", "TPR")
 
   /**
* Computes the area under the receiver operating characteristic (ROC) curve.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   lazy val areaUnderROC: Double = binaryMetrics.areaUnderROC()
 
   /**
* Returns the precision-recall curve, which is an Dataframe containing
* two fields recall, precision with (0.0, 1.0) prepended to it.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val pr: DataFrame = binaryMetrics.pr().toDF("recall", 
"precision")
 
   /**
* Returns a dataframe with two fields (threshold, F-Measure) curve with 
beta = 1.0.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val fMeasureByThreshold: DataFrame = {
 binaryMetrics.fMeasureByThreshold().toDF("threshold", "F-Measure")
@@ -781,6 +793,9 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns a dataframe with two fields (threshold, precision) curve.
* Every possible probability obtained in transforming the dataset are used
* as thresholds used in calculating the precision.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val precisionByThreshold: DataFrame = {
 binaryMetrics.precisionByThreshold().toDF("threshold", "precision")
@@ -790,6 +805,9 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns a dataframe with two fields (threshold, recall) curve.
* Every possible probability obtained in transforming the dataset are used
* as thresholds used in calculating the recall.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val recallByThreshold: DataFrame = {
 binaryMetrics.recallByThreshold().toDF("threshold", "recall")

http://git-wip-us.apache.org/repos/asf/spark/blob/9e24ba66/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apac

spark git commit: [SPARK-11521][ML][DOC] Document that Logistic, Linear Regression summaries ignore weight col

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6914b7504 -> 70febe224


[SPARK-11521][ML][DOC] Document that Logistic, Linear Regression summaries 
ignore weight col

Doc for 1.6 that the summaries mostly ignore the weight column.
To be corrected for 1.7

CC: mengxr thunterdb

Author: Joseph K. Bradley 

Closes #9927 from jkbradley/linregsummary-doc.

(cherry picked from commit 9e24ba667e43290fbaa3cacb93cf5d9be790f1fd)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70febe22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70febe22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70febe22

Branch: refs/heads/branch-1.6
Commit: 70febe224f64cb6468c14d4788a63b35d0475d41
Parents: 6914b75
Author: Joseph K. Bradley 
Authored: Tue Nov 24 09:54:55 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:55:02 2015 -0800

--
 .../ml/classification/LogisticRegression.scala| 18 ++
 .../spark/ml/regression/LinearRegression.scala| 15 +++
 2 files changed, 33 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/70febe22/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 418bbdc..d320d64 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -755,23 +755,35 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns the receiver operating characteristic (ROC) curve,
* which is an Dataframe having two fields (FPR, TPR)
* with (0.0, 0.0) prepended and (1.0, 1.0) appended to it.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
* @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic
*/
   @transient lazy val roc: DataFrame = binaryMetrics.roc().toDF("FPR", "TPR")
 
   /**
* Computes the area under the receiver operating characteristic (ROC) curve.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   lazy val areaUnderROC: Double = binaryMetrics.areaUnderROC()
 
   /**
* Returns the precision-recall curve, which is an Dataframe containing
* two fields recall, precision with (0.0, 1.0) prepended to it.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val pr: DataFrame = binaryMetrics.pr().toDF("recall", 
"precision")
 
   /**
* Returns a dataframe with two fields (threshold, F-Measure) curve with 
beta = 1.0.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val fMeasureByThreshold: DataFrame = {
 binaryMetrics.fMeasureByThreshold().toDF("threshold", "F-Measure")
@@ -781,6 +793,9 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns a dataframe with two fields (threshold, precision) curve.
* Every possible probability obtained in transforming the dataset are used
* as thresholds used in calculating the precision.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val precisionByThreshold: DataFrame = {
 binaryMetrics.precisionByThreshold().toDF("threshold", "precision")
@@ -790,6 +805,9 @@ class BinaryLogisticRegressionSummary 
private[classification] (
* Returns a dataframe with two fields (threshold, recall) curve.
* Every possible probability obtained in transforming the dataset are used
* as thresholds used in calculating the recall.
+   *
+   * Note: This ignores instance weights (setting all to 1.0) from 
[[LogisticRegression.weightCol]].
+   *   This will change in later Spark versions.
*/
   @transient lazy val recallByThreshold: DataFrame = {
 binaryMetrics.recallByThreshold().toDF("threshold", "recall")

http://git-wip-us.apache.org/repos/asf/spark/blob/70febe22/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a

spark git commit: [SPARK-11847][ML] Model export/import for spark.ml: LDA

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 70febe224 -> af86c38db


[SPARK-11847][ML] Model export/import for spark.ml: LDA

Add read/write support to LDA, similar to ALS.

save/load for ml.LocalLDAModel is done.
For DistributedLDAModel, I'm not sure if we can invoke save on the 
mllib.DistributedLDAModel directly. I'll send update after some test.

Author: Yuhao Yang 

Closes #9894 from hhbyyh/ldaMLsave.

(cherry picked from commit 52bc25c8e26d4be250d8ff7864067528f4f98592)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af86c38d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af86c38d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af86c38d

Branch: refs/heads/branch-1.6
Commit: af86c38db7676c4dfc2724d5f86f0f5f3a22e349
Parents: 70febe2
Author: Yuhao Yang 
Authored: Tue Nov 24 09:56:17 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:56:24 2015 -0800

--
 .../org/apache/spark/ml/clustering/LDA.scala| 110 ++-
 .../spark/mllib/clustering/LDAModel.scala   |   4 +-
 .../apache/spark/ml/clustering/LDASuite.scala   |  44 +++-
 3 files changed, 150 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/af86c38d/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 92e0581..830510b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.ml.clustering
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
 import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.param.shared.{HasCheckpointInterval, 
HasFeaturesCol, HasSeed, HasMaxIter}
 import org.apache.spark.ml.param._
+import org.apache.spark.ml.util._
 import org.apache.spark.mllib.clustering.{DistributedLDAModel => 
OldDistributedLDAModel,
 EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => 
OldLDAModel,
 LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
@@ -322,7 +323,7 @@ sealed abstract class LDAModel private[ml] (
 @Since("1.6.0") override val uid: String,
 @Since("1.6.0") val vocabSize: Int,
 @Since("1.6.0") @transient protected val sqlContext: SQLContext)
-  extends Model[LDAModel] with LDAParams with Logging {
+  extends Model[LDAModel] with LDAParams with Logging with MLWritable {
 
   // NOTE to developers:
   //  This abstraction should contain all important functionality for basic 
LDA usage.
@@ -486,6 +487,64 @@ class LocalLDAModel private[ml] (
 
   @Since("1.6.0")
   override def isDistributed: Boolean = false
+
+  @Since("1.6.0")
+  override def write: MLWriter = new LocalLDAModel.LocalLDAModelWriter(this)
+}
+
+
+@Since("1.6.0")
+object LocalLDAModel extends MLReadable[LocalLDAModel] {
+
+  private[LocalLDAModel]
+  class LocalLDAModelWriter(instance: LocalLDAModel) extends MLWriter {
+
+private case class Data(
+vocabSize: Int,
+topicsMatrix: Matrix,
+docConcentration: Vector,
+topicConcentration: Double,
+gammaShape: Double)
+
+override protected def saveImpl(path: String): Unit = {
+  DefaultParamsWriter.saveMetadata(instance, path, sc)
+  val oldModel = instance.oldLocalModel
+  val data = Data(instance.vocabSize, oldModel.topicsMatrix, 
oldModel.docConcentration,
+oldModel.topicConcentration, oldModel.gammaShape)
+  val dataPath = new Path(path, "data").toString
+  
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+}
+  }
+
+  private class LocalLDAModelReader extends MLReader[LocalLDAModel] {
+
+private val className = classOf[LocalLDAModel].getName
+
+override def load(path: String): LocalLDAModel = {
+  val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+  val dataPath = new Path(path, "data").toString
+  val data = sqlContext.read.parquet(dataPath)
+.select("vocabSize", "topicsMatrix", "docConcentration", 
"topicConcentration",
+  "gammaShape")
+.head()
+  val vocabSize = data.getAs[Int](0)
+  val topicsMatrix = data.getAs[Matrix](1)
+  val docConcentration = data.getAs[Vector](2)
+  val topicConcentration = data.getAs[Double](3)
+  val gammaShape = data.getAs[Double](4)
+  val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, 
topicConcentratio

spark git commit: [SPARK-11847][ML] Model export/import for spark.ml: LDA

2015-11-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 9e24ba667 -> 52bc25c8e


[SPARK-11847][ML] Model export/import for spark.ml: LDA

Add read/write support to LDA, similar to ALS.

save/load for ml.LocalLDAModel is done.
For DistributedLDAModel, I'm not sure if we can invoke save on the 
mllib.DistributedLDAModel directly. I'll send update after some test.

Author: Yuhao Yang 

Closes #9894 from hhbyyh/ldaMLsave.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52bc25c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52bc25c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52bc25c8

Branch: refs/heads/master
Commit: 52bc25c8e26d4be250d8ff7864067528f4f98592
Parents: 9e24ba6
Author: Yuhao Yang 
Authored: Tue Nov 24 09:56:17 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 24 09:56:17 2015 -0800

--
 .../org/apache/spark/ml/clustering/LDA.scala| 110 ++-
 .../spark/mllib/clustering/LDAModel.scala   |   4 +-
 .../apache/spark/ml/clustering/LDASuite.scala   |  44 +++-
 3 files changed, 150 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52bc25c8/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 92e0581..830510b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.ml.clustering
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
 import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.param.shared.{HasCheckpointInterval, 
HasFeaturesCol, HasSeed, HasMaxIter}
 import org.apache.spark.ml.param._
+import org.apache.spark.ml.util._
 import org.apache.spark.mllib.clustering.{DistributedLDAModel => 
OldDistributedLDAModel,
 EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => 
OldLDAModel,
 LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
@@ -322,7 +323,7 @@ sealed abstract class LDAModel private[ml] (
 @Since("1.6.0") override val uid: String,
 @Since("1.6.0") val vocabSize: Int,
 @Since("1.6.0") @transient protected val sqlContext: SQLContext)
-  extends Model[LDAModel] with LDAParams with Logging {
+  extends Model[LDAModel] with LDAParams with Logging with MLWritable {
 
   // NOTE to developers:
   //  This abstraction should contain all important functionality for basic 
LDA usage.
@@ -486,6 +487,64 @@ class LocalLDAModel private[ml] (
 
   @Since("1.6.0")
   override def isDistributed: Boolean = false
+
+  @Since("1.6.0")
+  override def write: MLWriter = new LocalLDAModel.LocalLDAModelWriter(this)
+}
+
+
+@Since("1.6.0")
+object LocalLDAModel extends MLReadable[LocalLDAModel] {
+
+  private[LocalLDAModel]
+  class LocalLDAModelWriter(instance: LocalLDAModel) extends MLWriter {
+
+private case class Data(
+vocabSize: Int,
+topicsMatrix: Matrix,
+docConcentration: Vector,
+topicConcentration: Double,
+gammaShape: Double)
+
+override protected def saveImpl(path: String): Unit = {
+  DefaultParamsWriter.saveMetadata(instance, path, sc)
+  val oldModel = instance.oldLocalModel
+  val data = Data(instance.vocabSize, oldModel.topicsMatrix, 
oldModel.docConcentration,
+oldModel.topicConcentration, oldModel.gammaShape)
+  val dataPath = new Path(path, "data").toString
+  
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+}
+  }
+
+  private class LocalLDAModelReader extends MLReader[LocalLDAModel] {
+
+private val className = classOf[LocalLDAModel].getName
+
+override def load(path: String): LocalLDAModel = {
+  val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+  val dataPath = new Path(path, "data").toString
+  val data = sqlContext.read.parquet(dataPath)
+.select("vocabSize", "topicsMatrix", "docConcentration", 
"topicConcentration",
+  "gammaShape")
+.head()
+  val vocabSize = data.getAs[Int](0)
+  val topicsMatrix = data.getAs[Matrix](1)
+  val docConcentration = data.getAs[Vector](2)
+  val topicConcentration = data.getAs[Double](3)
+  val gammaShape = data.getAs[Double](4)
+  val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, 
topicConcentration,
+gammaShape)
+  val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, 
sqlContext)

spark git commit: [SPARK-11926][SQL] unify GetStructField and GetInternalRowField

2015-11-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 52bc25c8e -> 19530da69


[SPARK-11926][SQL] unify GetStructField and GetInternalRowField

Author: Wenchen Fan 

Closes #9909 from cloud-fan/get-struct.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19530da6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19530da6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19530da6

Branch: refs/heads/master
Commit: 19530da6903fa59b051eec69b9c17e231c68454b
Parents: 52bc25c
Author: Wenchen Fan 
Authored: Tue Nov 24 11:09:01 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 24 11:09:01 2015 -0800

--
 .../spark/sql/catalyst/ScalaReflection.scala|  2 +-
 .../sql/catalyst/analysis/unresolved.scala  |  8 
 .../catalyst/encoders/ExpressionEncoder.scala   |  2 +-
 .../sql/catalyst/encoders/RowEncoder.scala  |  2 +-
 .../sql/catalyst/expressions/Expression.scala   |  2 +-
 .../expressions/complexTypeExtractors.scala | 18 -
 .../catalyst/expressions/namedExpressions.scala |  4 ++--
 .../sql/catalyst/expressions/objects.scala  | 21 
 .../catalyst/expressions/ComplexTypeSuite.scala |  4 ++--
 9 files changed, 21 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/19530da6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 476bece..d133ad3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -130,7 +130,7 @@ object ScalaReflection extends ScalaReflection {
 
 /** Returns the current path with a field at ordinal extracted. */
 def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression = path
-  .map(p => GetInternalRowField(p, ordinal, dataType))
+  .map(p => GetStructField(p, ordinal))
   .getOrElse(BoundReference(ordinal, dataType, false))
 
 /** Returns the current path or `BoundReference`. */

http://git-wip-us.apache.org/repos/asf/spark/blob/19530da6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 6485bdf..1b2a8dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -201,12 +201,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
 if (attribute.isDefined) {
   // This target resolved to an attribute in child. It must be a struct. 
Expand it.
   attribute.get.dataType match {
-case s: StructType => {
-  s.fields.map( f => {
-val extract = GetStructField(attribute.get, f, 
s.getFieldIndex(f.name).get)
+case s: StructType => s.zipWithIndex.map {
+  case (f, i) =>
+val extract = GetStructField(attribute.get, i)
 Alias(extract, target.get + "." + f.name)()
-  })
 }
+
 case _ => {
   throw new AnalysisException("Can only star expand struct data types. 
Attribute: `" +
 target.get + "`")

http://git-wip-us.apache.org/repos/asf/spark/blob/19530da6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 7bc9aed..0c10a56 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -111,7 +111,7 @@ object ExpressionEncoder {
   case UnresolvedAttribute(nameParts) =>
 assert(nameParts.length == 1)
 UnresolvedExtractValue(input, Literal(nameParts.head))
-  case BoundReference(ordinal, dt, _) => GetInternalRowField(input, 
ordinal, dt)
+  case BoundReference(ordinal, dt, _) => GetStructField(input, ordinal)
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1953

spark git commit: [SPARK-11926][SQL] unify GetStructField and GetInternalRowField

2015-11-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 af86c38db -> 927070d6d


[SPARK-11926][SQL] unify GetStructField and GetInternalRowField

Author: Wenchen Fan 

Closes #9909 from cloud-fan/get-struct.

(cherry picked from commit 19530da6903fa59b051eec69b9c17e231c68454b)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/927070d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/927070d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/927070d6

Branch: refs/heads/branch-1.6
Commit: 927070d6d75abcaac6d9676b1b9b556f21ec8536
Parents: af86c38
Author: Wenchen Fan 
Authored: Tue Nov 24 11:09:01 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Nov 24 11:10:58 2015 -0800

--
 .../spark/sql/catalyst/ScalaReflection.scala|  2 +-
 .../sql/catalyst/analysis/unresolved.scala  |  8 
 .../catalyst/encoders/ExpressionEncoder.scala   |  2 +-
 .../sql/catalyst/encoders/RowEncoder.scala  |  2 +-
 .../sql/catalyst/expressions/Expression.scala   |  2 +-
 .../expressions/complexTypeExtractors.scala | 18 -
 .../catalyst/expressions/namedExpressions.scala |  4 ++--
 .../sql/catalyst/expressions/objects.scala  | 21 
 .../catalyst/expressions/ComplexTypeSuite.scala |  4 ++--
 9 files changed, 21 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/927070d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 476bece..d133ad3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -130,7 +130,7 @@ object ScalaReflection extends ScalaReflection {
 
 /** Returns the current path with a field at ordinal extracted. */
 def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression = path
-  .map(p => GetInternalRowField(p, ordinal, dataType))
+  .map(p => GetStructField(p, ordinal))
   .getOrElse(BoundReference(ordinal, dataType, false))
 
 /** Returns the current path or `BoundReference`. */

http://git-wip-us.apache.org/repos/asf/spark/blob/927070d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 6485bdf..1b2a8dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -201,12 +201,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
 if (attribute.isDefined) {
   // This target resolved to an attribute in child. It must be a struct. 
Expand it.
   attribute.get.dataType match {
-case s: StructType => {
-  s.fields.map( f => {
-val extract = GetStructField(attribute.get, f, 
s.getFieldIndex(f.name).get)
+case s: StructType => s.zipWithIndex.map {
+  case (f, i) =>
+val extract = GetStructField(attribute.get, i)
 Alias(extract, target.get + "." + f.name)()
-  })
 }
+
 case _ => {
   throw new AnalysisException("Can only star expand struct data types. 
Attribute: `" +
 target.get + "`")

http://git-wip-us.apache.org/repos/asf/spark/blob/927070d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 7bc9aed..0c10a56 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -111,7 +111,7 @@ object ExpressionEncoder {
   case UnresolvedAttribute(nameParts) =>
 assert(nameParts.length == 1)
 UnresolvedExtractValue(input, Literal(nameParts.head))
-  case BoundReference(ordinal, dt, _) => GetInternalRowField(input, 
ordinal, dt)
+  case BoundReference(ordinal, dt, _) => Ge

spark git commit: [SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 19530da69 -> 81012546e


[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's 
thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of 
https://github.com/apache/spark/pull/9741

Author: tedyu 

Closes #9852 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81012546
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81012546
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81012546

Branch: refs/heads/master
Commit: 81012546ee5a80d2576740af0dad067b0f5962c5
Parents: 19530da
Author: tedyu 
Authored: Tue Nov 24 12:22:33 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 12:22:33 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala| 31 
 2 files changed, 35 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b153a7b..e19ba11 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+if (AsynchronousListenerBus.withinListenerThread.value) {
+  throw new SparkException("Cannot stop SparkContext within listener 
thread of" +
+" AsynchronousListenerBus")
+}
 // Use the stopping variable to ensure no contention for the stop scenario.
 // Still track the stopped variable for use elsewhere in the code.
 if (!stopped.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e5458..f20d5be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+sc = new SparkContext("local", "SparkListenerSuite")
+val listener = new SparkContextStoppingListener(sc)
+val bus = new LiveListenerBus
+bus.addListener(listener)
+
+// Starting listener bus should flush all buffered events
+bus.start(sc)
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+bus.stop()
+assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
 val counter = new BasicJobCounter
 val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends 
SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+try {
+  sc.stop()
+} catch {
+  case se: SparkException =>
+sparkExSeen = true
+}
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends 
SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 927070d6d -> 0419fd361


[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's 
thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of 
https://github.com/apache/spark/pull/9741

Author: tedyu 

Closes #9852 from tedyu/master.

(cherry picked from commit 81012546ee5a80d2576740af0dad067b0f5962c5)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0419fd36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0419fd36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0419fd36

Branch: refs/heads/branch-1.6
Commit: 0419fd3615ec73be1b2774e768238bf3f0cb4403
Parents: 927070d
Author: tedyu 
Authored: Tue Nov 24 12:22:33 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 12:23:51 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala| 31 
 2 files changed, 35 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0419fd36/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index af4456c..90480e5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+if (AsynchronousListenerBus.withinListenerThread.value) {
+  throw new SparkException("Cannot stop SparkContext within listener 
thread of" +
+" AsynchronousListenerBus")
+}
 // Use the stopping variable to ensure no contention for the stop scenario.
 // Still track the stopped variable for use elsewhere in the code.
 if (!stopped.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0419fd36/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e5458..f20d5be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+sc = new SparkContext("local", "SparkListenerSuite")
+val listener = new SparkContextStoppingListener(sc)
+val bus = new LiveListenerBus
+bus.addListener(listener)
+
+// Starting listener bus should flush all buffered events
+bus.start(sc)
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+bus.stop()
+assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
 val counter = new BasicJobCounter
 val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends 
SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+try {
+  sc.stop()
+} catch {
+  case se: SparkException =>
+sparkExSeen = true
+}
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends 
SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11946][SQL] Audit pivot API for 1.6.

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 81012546e -> f31527227


[SPARK-11946][SQL] Audit pivot API for 1.6.

Currently pivot's signature looks like

```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData

scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```

I think we can remove the one that takes "Column" types, since callers should 
always be passing in literals. It'd also be more clear if the values are not 
varargs, but rather Seq or java.util.List.

I also made similar changes for Python.

Author: Reynold Xin 

Closes #9929 from rxin/SPARK-11946.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3152722
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3152722
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3152722

Branch: refs/heads/master
Commit: f3152722791b163fa66597b3684009058195ba33
Parents: 8101254
Author: Reynold Xin 
Authored: Tue Nov 24 12:54:37 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 12:54:37 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   |   1 -
 python/pyspark/sql/group.py |  12 +-
 .../sql/catalyst/expressions/literals.scala |   1 +
 .../org/apache/spark/sql/GroupedData.scala  | 154 +++
 .../apache/spark/sql/JavaDataFrameSuite.java|  16 ++
 .../apache/spark/sql/DataFramePivotSuite.scala  |  21 +--
 .../org/apache/spark/sql/test/SQLTestData.scala |   1 +
 7 files changed, 125 insertions(+), 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f3152722/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ae725b4..77a184d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1574,7 +1574,6 @@ class DAGScheduler(
   }
 
   def stop() {
-logInfo("Stopping DAGScheduler")
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/f3152722/python/pyspark/sql/group.py
--
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 227f40b..d8ed7eb 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -168,20 +168,24 @@ class GroupedData(object):
 """
 
 @since(1.6)
-def pivot(self, pivot_col, *values):
+def pivot(self, pivot_col, values=None):
 """Pivots a column of the current DataFrame and preform the specified 
aggregation.
 
 :param pivot_col: Column to pivot
 :param values: Optional list of values of pivotColumn that will be 
translated to columns in
 the output data frame. If values are not provided the method with 
do an immediate call
 to .distinct() on the pivot column.
->>> df4.groupBy("year").pivot("course", "dotNET", 
"Java").sum("earnings").collect()
+
+>>> df4.groupBy("year").pivot("course", ["dotNET", 
"Java"]).sum("earnings").collect()
 [Row(year=2012, dotNET=15000, Java=2), Row(year=2013, 
dotNET=48000, Java=3)]
+
 >>> df4.groupBy("year").pivot("course").sum("earnings").collect()
 [Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, 
dotNET=48000)]
 """
-jgd = self._jdf.pivot(_to_java_column(pivot_col),
-  _to_seq(self.sql_ctx._sc, values, 
_create_column_from_literal))
+if values is None:
+jgd = self._jdf.pivot(pivot_col)
+else:
+jgd = self._jdf.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f3152722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index e34fd49..68ec688 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -44,6 +44,7 @@ object Literal {
 case a: Array[Byte] => Literal(a, BinaryType)
 case i: CalendarInterval => Literal(i, CalendarIntervalType)
 case null => Literal(null, NullType)
+case v: Lite

spark git commit: [SPARK-11946][SQL] Audit pivot API for 1.6.

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 0419fd361 -> 3f40af574


[SPARK-11946][SQL] Audit pivot API for 1.6.

Currently pivot's signature looks like

```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData

scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```

I think we can remove the one that takes "Column" types, since callers should 
always be passing in literals. It'd also be more clear if the values are not 
varargs, but rather Seq or java.util.List.

I also made similar changes for Python.

Author: Reynold Xin 

Closes #9929 from rxin/SPARK-11946.

(cherry picked from commit f3152722791b163fa66597b3684009058195ba33)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f40af57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f40af57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f40af57

Branch: refs/heads/branch-1.6
Commit: 3f40af574ed14909d30c10cad24d747fba49b783
Parents: 0419fd3
Author: Reynold Xin 
Authored: Tue Nov 24 12:54:37 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 12:54:44 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   |   1 -
 python/pyspark/sql/group.py |  12 +-
 .../sql/catalyst/expressions/literals.scala |   1 +
 .../org/apache/spark/sql/GroupedData.scala  | 154 +++
 .../apache/spark/sql/JavaDataFrameSuite.java|  16 ++
 .../apache/spark/sql/DataFramePivotSuite.scala  |  21 +--
 .../org/apache/spark/sql/test/SQLTestData.scala |   1 +
 7 files changed, 125 insertions(+), 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f40af57/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ae725b4..77a184d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1574,7 +1574,6 @@ class DAGScheduler(
   }
 
   def stop() {
-logInfo("Stopping DAGScheduler")
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/3f40af57/python/pyspark/sql/group.py
--
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 227f40b..d8ed7eb 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -168,20 +168,24 @@ class GroupedData(object):
 """
 
 @since(1.6)
-def pivot(self, pivot_col, *values):
+def pivot(self, pivot_col, values=None):
 """Pivots a column of the current DataFrame and preform the specified 
aggregation.
 
 :param pivot_col: Column to pivot
 :param values: Optional list of values of pivotColumn that will be 
translated to columns in
 the output data frame. If values are not provided the method with 
do an immediate call
 to .distinct() on the pivot column.
->>> df4.groupBy("year").pivot("course", "dotNET", 
"Java").sum("earnings").collect()
+
+>>> df4.groupBy("year").pivot("course", ["dotNET", 
"Java"]).sum("earnings").collect()
 [Row(year=2012, dotNET=15000, Java=2), Row(year=2013, 
dotNET=48000, Java=3)]
+
 >>> df4.groupBy("year").pivot("course").sum("earnings").collect()
 [Row(year=2012, Java=2, dotNET=15000), Row(year=2013, Java=3, 
dotNET=48000)]
 """
-jgd = self._jdf.pivot(_to_java_column(pivot_col),
-  _to_seq(self.sql_ctx._sc, values, 
_create_column_from_literal))
+if values is None:
+jgd = self._jdf.pivot(pivot_col)
+else:
+jgd = self._jdf.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3f40af57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index e34fd49..68ec688 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -44,6 +44,7 @@ object Literal {
 case a: Array[Byte] => Literal(a, BinaryType)
 case i: Calend

spark git commit: [SPARK-11929][CORE] Make the repl log4j configuration override the root logger.

2015-11-24 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master f31527227 -> e6dd23746


[SPARK-11929][CORE] Make the repl log4j configuration override the root logger.

In the default Spark distribution, there are currently two separate
log4j config files, with different default values for the root logger,
so that when running the shell you have a different default log level.
This makes the shell more usable, since the logs don't overwhelm the
output.

But if you install a custom log4j.properties, you lose that, because
then it's going to be used no matter whether you're running a regular
app or the shell.

With this change, the overriding of the log level is done differently;
the log level repl's main class (org.apache.spark.repl.Main) is used
to define the root logger's level when running the shell, defaulting
to WARN if it's not set explicitly.

On a somewhat related change, the shell output about the "sc" variable
was changed a bit to contain a little more useful information about
the application, since when the root logger's log level is WARN, that
information is never shown to the user.

Author: Marcelo Vanzin 

Closes #9816 from vanzin/shell-logging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6dd2374
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6dd2374
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6dd2374

Branch: refs/heads/master
Commit: e6dd237463d2de8c506f0735dfdb3f43e8122513
Parents: f315272
Author: Marcelo Vanzin 
Authored: Tue Nov 24 15:08:02 2015 -0600
Committer: Imran Rashid 
Committed: Tue Nov 24 15:08:02 2015 -0600

--
 conf/log4j.properties.template  |  5 +++
 .../apache/spark/log4j-defaults-repl.properties | 33 --
 .../org/apache/spark/log4j-defaults.properties  |  5 +++
 .../main/scala/org/apache/spark/Logging.scala   | 45 ++--
 .../org/apache/spark/repl/SparkILoopInit.scala  | 21 -
 .../org/apache/spark/repl/SparkILoop.scala  | 25 +--
 6 files changed, 57 insertions(+), 77 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e6dd2374/conf/log4j.properties.template
--
diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index f3046be..9809b0c 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -22,6 +22,11 @@ log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
 
+# Set the default spark-shell log level to WARN. When running the spark-shell, 
the
+# log level for this class is used to overwrite the root logger's log level, 
so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=WARN
+
 # Settings to quiet third party logs that are too verbose
 log4j.logger.org.spark-project.jetty=WARN
 log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR

http://git-wip-us.apache.org/repos/asf/spark/blob/e6dd2374/core/src/main/resources/org/apache/spark/log4j-defaults-repl.properties
--
diff --git 
a/core/src/main/resources/org/apache/spark/log4j-defaults-repl.properties 
b/core/src/main/resources/org/apache/spark/log4j-defaults-repl.properties
deleted file mode 100644
index c85abc3..000
--- a/core/src/main/resources/org/apache/spark/log4j-defaults-repl.properties
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.or

spark git commit: [SPARK-11805] free the array in UnsafeExternalSorter during spilling

2015-11-24 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3f40af574 -> 015569341


[SPARK-11805] free the array in UnsafeExternalSorter during spilling

After calling spill() on SortedIterator, the array inside InMemorySorter is not 
needed, it should be freed during spilling, this could help to join multiple 
tables with limited memory.

Author: Davies Liu 

Closes #9793 from davies/free_array.

(cherry picked from commit 58d9b260556a89a3d0832d583acafba1df7c6751)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01556934
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01556934
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01556934

Branch: refs/heads/branch-1.6
Commit: 015569341dc8031b62ea94a2d194bbd35567f727
Parents: 3f40af5
Author: Davies Liu 
Authored: Tue Nov 24 14:33:28 2015 -0800
Committer: Josh Rosen 
Committed: Tue Nov 24 14:33:40 2015 -0800

--
 .../unsafe/sort/UnsafeExternalSorter.java   | 10 ---
 .../unsafe/sort/UnsafeInMemorySorter.java   | 31 
 2 files changed, 19 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/01556934/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 9a7b2ad..2e40312 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -468,6 +468,12 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
   allocatedPages.clear();
 }
+
+// in-memory sorter will not be used after spilling
+assert(inMemSorter != null);
+released += inMemSorter.getMemoryUsage();
+inMemSorter.free();
+inMemSorter = null;
 return released;
   }
 }
@@ -489,10 +495,6 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
   upstream = nextUpstream;
   nextUpstream = null;
-
-  assert(inMemSorter != null);
-  inMemSorter.free();
-  inMemSorter = null;
 }
 numRecords--;
 upstream.loadNext();

http://git-wip-us.apache.org/repos/asf/spark/blob/01556934/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index a218ad4..dce1f15 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -108,6 +108,7 @@ public final class UnsafeInMemorySorter {
*/
   public void free() {
 consumer.freeArray(array);
+array = null;
   }
 
   public void reset() {
@@ -160,28 +161,22 @@ public final class UnsafeInMemorySorter {
 pos++;
   }
 
-  public static final class SortedIterator extends UnsafeSorterIterator {
+  public final class SortedIterator extends UnsafeSorterIterator {
 
-private final TaskMemoryManager memoryManager;
-private final int sortBufferInsertPosition;
-private final LongArray sortBuffer;
-private int position = 0;
+private final int numRecords;
+private int position;
 private Object baseObject;
 private long baseOffset;
 private long keyPrefix;
 private int recordLength;
 
-private SortedIterator(
-TaskMemoryManager memoryManager,
-int sortBufferInsertPosition,
-LongArray sortBuffer) {
-  this.memoryManager = memoryManager;
-  this.sortBufferInsertPosition = sortBufferInsertPosition;
-  this.sortBuffer = sortBuffer;
+private SortedIterator(int numRecords) {
+  this.numRecords = numRecords;
+  this.position = 0;
 }
 
 public SortedIterator clone () {
-  SortedIterator iter = new SortedIterator(memoryManager, 
sortBufferInsertPosition, sortBuffer);
+  SortedIterator iter = new SortedIterator(numRecords);
   iter.position = position;
   iter.baseObject = baseObject;
   iter.baseOffset = baseOffset;
@@ -192,21 +187,21 @@ public final class UnsafeInMemorySorter {
 
 @Override
 public boolean hasNext() {
-  return position < sortBufferInsertPosition;
+  return position 

spark git commit: [SPARK-11805] free the array in UnsafeExternalSorter during spilling

2015-11-24 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master e6dd23746 -> 58d9b2605


[SPARK-11805] free the array in UnsafeExternalSorter during spilling

After calling spill() on SortedIterator, the array inside InMemorySorter is not 
needed, it should be freed during spilling, this could help to join multiple 
tables with limited memory.

Author: Davies Liu 

Closes #9793 from davies/free_array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58d9b260
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58d9b260
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58d9b260

Branch: refs/heads/master
Commit: 58d9b260556a89a3d0832d583acafba1df7c6751
Parents: e6dd237
Author: Davies Liu 
Authored: Tue Nov 24 14:33:28 2015 -0800
Committer: Josh Rosen 
Committed: Tue Nov 24 14:33:28 2015 -0800

--
 .../unsafe/sort/UnsafeExternalSorter.java   | 10 ---
 .../unsafe/sort/UnsafeInMemorySorter.java   | 31 
 2 files changed, 19 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/58d9b260/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 9a7b2ad..2e40312 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -468,6 +468,12 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
   allocatedPages.clear();
 }
+
+// in-memory sorter will not be used after spilling
+assert(inMemSorter != null);
+released += inMemSorter.getMemoryUsage();
+inMemSorter.free();
+inMemSorter = null;
 return released;
   }
 }
@@ -489,10 +495,6 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
   upstream = nextUpstream;
   nextUpstream = null;
-
-  assert(inMemSorter != null);
-  inMemSorter.free();
-  inMemSorter = null;
 }
 numRecords--;
 upstream.loadNext();

http://git-wip-us.apache.org/repos/asf/spark/blob/58d9b260/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index a218ad4..dce1f15 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -108,6 +108,7 @@ public final class UnsafeInMemorySorter {
*/
   public void free() {
 consumer.freeArray(array);
+array = null;
   }
 
   public void reset() {
@@ -160,28 +161,22 @@ public final class UnsafeInMemorySorter {
 pos++;
   }
 
-  public static final class SortedIterator extends UnsafeSorterIterator {
+  public final class SortedIterator extends UnsafeSorterIterator {
 
-private final TaskMemoryManager memoryManager;
-private final int sortBufferInsertPosition;
-private final LongArray sortBuffer;
-private int position = 0;
+private final int numRecords;
+private int position;
 private Object baseObject;
 private long baseOffset;
 private long keyPrefix;
 private int recordLength;
 
-private SortedIterator(
-TaskMemoryManager memoryManager,
-int sortBufferInsertPosition,
-LongArray sortBuffer) {
-  this.memoryManager = memoryManager;
-  this.sortBufferInsertPosition = sortBufferInsertPosition;
-  this.sortBuffer = sortBuffer;
+private SortedIterator(int numRecords) {
+  this.numRecords = numRecords;
+  this.position = 0;
 }
 
 public SortedIterator clone () {
-  SortedIterator iter = new SortedIterator(memoryManager, 
sortBufferInsertPosition, sortBuffer);
+  SortedIterator iter = new SortedIterator(numRecords);
   iter.position = position;
   iter.baseObject = baseObject;
   iter.baseOffset = baseOffset;
@@ -192,21 +187,21 @@ public final class UnsafeInMemorySorter {
 
 @Override
 public boolean hasNext() {
-  return position < sortBufferInsertPosition;
+  return position / 2 < numRecords;
 }
 
 public int numRecordsLeft() {
-  return (sortBufferInsertPosition - p

spark git commit: Added a line of comment to explain why the extra sort exists in pivot.

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 58d9b2605 -> 34ca392da


Added a line of comment to explain why the extra sort exists in pivot.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34ca392d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34ca392d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34ca392d

Branch: refs/heads/master
Commit: 34ca392da7097a1fbe48cd6c3ebff51453ca26ca
Parents: 58d9b26
Author: Reynold Xin 
Authored: Tue Nov 24 14:51:01 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 14:51:01 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34ca392d/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index ee7150c..abd531c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -304,7 +304,7 @@ class GroupedData protected[sql](
 // Get the distinct values of the column and sort them so its consistent
 val values = df.select(pivotColumn)
   .distinct()
-  .sort(pivotColumn)
+  .sort(pivotColumn)  // ensure that the output columns are in a 
consistent logical order
   .map(_.get(0))
   .take(maxValues + 1)
   .toSeq


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11783][SQL] Fixes execution Hive client when using remote Hive metastore

2015-11-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 34ca392da -> c7f95df5c


[SPARK-11783][SQL] Fixes execution Hive client when using remote Hive metastore

When using remote Hive metastore, `hive.metastore.uris` is set to the metastore 
URI.  However, it overrides `javax.jdo.option.ConnectionURL` unexpectedly, thus 
the execution Hive client connects to the actual remote Hive metastore instead 
of the Derby metastore created in the temporary directory.  Cleaning this 
configuration for the execution Hive client fixes this issue.

Author: Cheng Lian 

Closes #9895 from liancheng/spark-11783.clean-remote-metastore-config.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7f95df5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7f95df5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7f95df5

Branch: refs/heads/master
Commit: c7f95df5c6d8eb2e6f11cf58b704fea34326a5f2
Parents: 34ca392
Author: Cheng Lian 
Authored: Tue Nov 24 14:59:14 2015 -0800
Committer: Yin Huai 
Committed: Tue Nov 24 15:08:53 2015 -0800

--
 .../org/apache/spark/sql/hive/HiveContext.scala  | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c7f95df5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c0bb5af..8a42641 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -736,6 +736,21 @@ private[hive] object HiveContext {
   
s"jdbc:derby:;databaseName=${localMetastore.getAbsolutePath};create=true")
 propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
   "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
+
+// SPARK-11783: When "hive.metastore.uris" is set, the metastore 
connection mode will be
+// remote 
(https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
+// mentions that "If hive.metastore.uris is empty local mode is assumed, 
remote otherwise").
+// Remote means that the metastore server is running in its own process.
+// When the mode is remote, configurations like 
"javax.jdo.option.ConnectionURL" will not be
+// used (because they are used by remote metastore server that talks to 
the database).
+// Because execution Hive should always connects to a embedded derby 
metastore.
+// We have to remove the value of hive.metastore.uris. So, the execution 
Hive client connects
+// to the actual embedded derby metastore instead of the remote metastore.
+// You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf 
(in Hive's repo).
+// Then, you will find that the local metastore mode is only set to true 
when
+// hive.metastore.uris is not set.
+propMap.put(ConfVars.METASTOREURIS.varname, "")
+
 propMap.toMap
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11783][SQL] Fixes execution Hive client when using remote Hive metastore

2015-11-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 015569341 -> 3f15ad783


[SPARK-11783][SQL] Fixes execution Hive client when using remote Hive metastore

When using remote Hive metastore, `hive.metastore.uris` is set to the metastore 
URI.  However, it overrides `javax.jdo.option.ConnectionURL` unexpectedly, thus 
the execution Hive client connects to the actual remote Hive metastore instead 
of the Derby metastore created in the temporary directory.  Cleaning this 
configuration for the execution Hive client fixes this issue.

Author: Cheng Lian 

Closes #9895 from liancheng/spark-11783.clean-remote-metastore-config.

(cherry picked from commit c7f95df5c6d8eb2e6f11cf58b704fea34326a5f2)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f15ad78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f15ad78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f15ad78

Branch: refs/heads/branch-1.6
Commit: 3f15ad783886d077b5ecf5eee1edfc2d699bb35a
Parents: 0155693
Author: Cheng Lian 
Authored: Tue Nov 24 14:59:14 2015 -0800
Committer: Yin Huai 
Committed: Tue Nov 24 15:09:17 2015 -0800

--
 .../org/apache/spark/sql/hive/HiveContext.scala  | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f15ad78/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c0bb5af..8a42641 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -736,6 +736,21 @@ private[hive] object HiveContext {
   
s"jdbc:derby:;databaseName=${localMetastore.getAbsolutePath};create=true")
 propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
   "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
+
+// SPARK-11783: When "hive.metastore.uris" is set, the metastore 
connection mode will be
+// remote 
(https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
+// mentions that "If hive.metastore.uris is empty local mode is assumed, 
remote otherwise").
+// Remote means that the metastore server is running in its own process.
+// When the mode is remote, configurations like 
"javax.jdo.option.ConnectionURL" will not be
+// used (because they are used by remote metastore server that talks to 
the database).
+// Because execution Hive should always connects to a embedded derby 
metastore.
+// We have to remove the value of hive.metastore.uris. So, the execution 
Hive client connects
+// to the actual embedded derby metastore instead of the remote metastore.
+// You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf 
(in Hive's repo).
+// Then, you will find that the local metastore mode is only set to true 
when
+// hive.metastore.uris is not set.
+propMap.put(ConfVars.METASTOREURIS.varname, "")
+
 propMap.toMap
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3f15ad783 -> 6d8c4c644


[SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs

This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.

After reading the comments of SPARK-, I am unclear about the plan for 
supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and 
Dataframe APIs provide users such a flexibility to control the number of 
partitions.

In most traditional RDBMS, they expose the number of partitions, the 
partitioning columns, the table partitioning methods to DBAs for performance 
tuning and storage planning. Normally, these parameters could largely affect 
the query performance. Since the actual performance depends on the workload 
types, I think it is almost impossible to automate the discovery of the best 
partitioning strategy for all the scenarios.

I am wondering if Dataset APIs are planning to hide these APIs from users? Feel 
free to reject my PR if it does not match the plan.

Thank you for your answers. marmbrus rxin cloud-fan

Author: gatorsmile 

Closes #9899 from gatorsmile/coalesce.

(cherry picked from commit 238ae51b66ac12d15fba6aff061804004c5ca6cb)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d8c4c64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d8c4c64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d8c4c64

Branch: refs/heads/branch-1.6
Commit: 6d8c4c6448e898dad36e243da3eec647d2a45076
Parents: 3f15ad7
Author: gatorsmile 
Authored: Tue Nov 24 15:54:10 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 15:54:16 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala | 19 +++
 .../org/apache/spark/sql/DatasetSuite.scala  | 15 +++
 2 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6d8c4c64/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0764750..17e2611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -152,6 +152,25 @@ class Dataset[T] private[sql](
*/
   def count(): Long = toDF().count()
 
+  /**
+* Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+* @since 1.6.0
+*/
+  def repartition(numPartitions: Int): Dataset[T] = withPlan {
+Repartition(numPartitions, shuffle = true, _)
+  }
+
+  /**
+* Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+* Similar to coalesce defined on an [[RDD]], this operation results in a 
narrow dependency, e.g.
+* if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
+* the 100 new partitions will claim 10 of the current partitions.
+* @since 1.6.0
+*/
+  def coalesce(numPartitions: Int): Dataset[T] = withPlan {
+Repartition(numPartitions, shuffle = false, _)
+  }
+
   /* *** *
*  Functional Operations  *
* *** */

http://git-wip-us.apache.org/repos/asf/spark/blob/6d8c4c64/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 13eede1..c253fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -52,6 +52,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 assert(ds.takeAsList(1).get(0) == item)
   }
 
+  test("coalesce, repartition") {
+val data = (1 to 100).map(i => ClassData(i.toString, i))
+val ds = data.toDS()
+
+assert(ds.repartition(10).rdd.partitions.length == 10)
+checkAnswer(
+  ds.repartition(10),
+  data: _*)
+
+assert(ds.coalesce(1).rdd.partitions.length == 1)
+checkAnswer(
+  ds.coalesce(1),
+  data: _*)
+  }
+
   test("as tuple") {
 val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
 checkAnswer(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master c7f95df5c -> 238ae51b6


[SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs

This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.

After reading the comments of SPARK-, I am unclear about the plan for 
supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and 
Dataframe APIs provide users such a flexibility to control the number of 
partitions.

In most traditional RDBMS, they expose the number of partitions, the 
partitioning columns, the table partitioning methods to DBAs for performance 
tuning and storage planning. Normally, these parameters could largely affect 
the query performance. Since the actual performance depends on the workload 
types, I think it is almost impossible to automate the discovery of the best 
partitioning strategy for all the scenarios.

I am wondering if Dataset APIs are planning to hide these APIs from users? Feel 
free to reject my PR if it does not match the plan.

Thank you for your answers. marmbrus rxin cloud-fan

Author: gatorsmile 

Closes #9899 from gatorsmile/coalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/238ae51b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/238ae51b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/238ae51b

Branch: refs/heads/master
Commit: 238ae51b66ac12d15fba6aff061804004c5ca6cb
Parents: c7f95df
Author: gatorsmile 
Authored: Tue Nov 24 15:54:10 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 15:54:10 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala | 19 +++
 .../org/apache/spark/sql/DatasetSuite.scala  | 15 +++
 2 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/238ae51b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0764750..17e2611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -152,6 +152,25 @@ class Dataset[T] private[sql](
*/
   def count(): Long = toDF().count()
 
+  /**
+* Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+* @since 1.6.0
+*/
+  def repartition(numPartitions: Int): Dataset[T] = withPlan {
+Repartition(numPartitions, shuffle = true, _)
+  }
+
+  /**
+* Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+* Similar to coalesce defined on an [[RDD]], this operation results in a 
narrow dependency, e.g.
+* if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
+* the 100 new partitions will claim 10 of the current partitions.
+* @since 1.6.0
+*/
+  def coalesce(numPartitions: Int): Dataset[T] = withPlan {
+Repartition(numPartitions, shuffle = false, _)
+  }
+
   /* *** *
*  Functional Operations  *
* *** */

http://git-wip-us.apache.org/repos/asf/spark/blob/238ae51b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 13eede1..c253fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -52,6 +52,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 assert(ds.takeAsList(1).get(0) == item)
   }
 
+  test("coalesce, repartition") {
+val data = (1 to 100).map(i => ClassData(i.toString, i))
+val ds = data.toDS()
+
+assert(ds.repartition(10).rdd.partitions.length == 10)
+checkAnswer(
+  ds.repartition(10),
+  data: _*)
+
+assert(ds.coalesce(1).rdd.partitions.length == 1)
+checkAnswer(
+  ds.coalesce(1),
+  data: _*)
+  }
+
   test("as tuple") {
 val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
 checkAnswer(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Added a line of comment to explain why the extra sort exists in pivot.

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6d8c4c644 -> 36a99f93f


Added a line of comment to explain why the extra sort exists in pivot.

(cherry picked from commit 34ca392da7097a1fbe48cd6c3ebff51453ca26ca)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36a99f93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36a99f93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36a99f93

Branch: refs/heads/branch-1.6
Commit: 36a99f93f7d77dac1159e5703d931d24cb24b154
Parents: 6d8c4c6
Author: Reynold Xin 
Authored: Tue Nov 24 14:51:01 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 16:23:40 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/36a99f93/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index ee7150c..abd531c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -304,7 +304,7 @@ class GroupedData protected[sql](
 // Get the distinct values of the column and sort them so its consistent
 val values = df.select(pivotColumn)
   .distinct()
-  .sort(pivotColumn)
+  .sort(pivotColumn)  // ensure that the output columns are in a 
consistent logical order
   .map(_.get(0))
   .take(maxValues + 1)
   .toSeq


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11967][SQL] Consistent use of varargs for multiple paths in DataFrameReader

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 36a99f93f -> 4464fa25c


[SPARK-11967][SQL] Consistent use of varargs for multiple paths in 
DataFrameReader

This patch makes it consistent to use varargs in all DataFrameReader methods, 
including Parquet, JSON, text, and the generic load function.

Also added a few more API tests for the Java API.

Author: Reynold Xin 

Closes #9945 from rxin/SPARK-11967.

(cherry picked from commit 25bbd3c16e8e8be4d2c43000223d54650e9a3696)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4464fa25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4464fa25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4464fa25

Branch: refs/heads/branch-1.6
Commit: 4464fa25c227ce9012862f9940babc47527ebbf3
Parents: 36a99f9
Author: Reynold Xin 
Authored: Tue Nov 24 18:16:07 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 18:16:14 2015 -0800

--
 python/pyspark/sql/readwriter.py| 19 +++
 .../org/apache/spark/sql/DataFrameReader.scala  | 36 
 .../apache/spark/sql/JavaDataFrameSuite.java| 23 +
 sql/core/src/test/resources/text-suite2.txt |  1 +
 .../org/apache/spark/sql/DataFrameSuite.scala   |  2 +-
 5 files changed, 66 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4464fa25/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index e8f0d7e..2e75f0c 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -109,7 +109,7 @@ class DataFrameReader(object):
 def load(self, path=None, format=None, schema=None, **options):
 """Loads data from a data source and returns it as a :class`DataFrame`.
 
-:param path: optional string for file-system backed data sources.
+:param path: optional string or a list of string for file-system 
backed data sources.
 :param format: optional string for format of the data source. Default 
to 'parquet'.
 :param schema: optional :class:`StructType` for the input schema.
 :param options: all other string options
@@ -118,6 +118,7 @@ class DataFrameReader(object):
 ... opt2=1, opt3='str')
 >>> df.dtypes
 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
+
 >>> df = 
sqlContext.read.format('json').load(['python/test_support/sql/people.json',
 ... 'python/test_support/sql/people1.json'])
 >>> df.dtypes
@@ -130,10 +131,8 @@ class DataFrameReader(object):
 self.options(**options)
 if path is not None:
 if type(path) == list:
-paths = path
-gateway = self._sqlContext._sc._gateway
-jpaths = utils.toJArray(gateway, gateway.jvm.java.lang.String, 
paths)
-return self._df(self._jreader.load(jpaths))
+return self._df(
+
self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 else:
 return self._df(self._jreader.load(path))
 else:
@@ -175,6 +174,8 @@ class DataFrameReader(object):
 self.schema(schema)
 if isinstance(path, basestring):
 return self._df(self._jreader.json(path))
+elif type(path) == list:
+return 
self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 elif isinstance(path, RDD):
 return self._df(self._jreader.json(path._jrdd))
 else:
@@ -205,16 +206,20 @@ class DataFrameReader(object):
 
 @ignore_unicode_prefix
 @since(1.6)
-def text(self, path):
+def text(self, paths):
 """Loads a text file and returns a [[DataFrame]] with a single string 
column named "text".
 
 Each line in the text file is a new row in the resulting DataFrame.
 
+:param paths: string, or list of strings, for input path(s).
+
 >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt')
 >>> df.collect()
 [Row(value=u'hello'), Row(value=u'this')]
 """
-return self._df(self._jreader.text(path))
+if isinstance(paths, basestring):
+paths = [paths]
+return 
self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
 
 @since(1.5)
 def orc(self, path):

http://git-wip-us.apache.org/repos/asf/spark/blob/4464fa25/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b

spark git commit: [SPARK-11967][SQL] Consistent use of varargs for multiple paths in DataFrameReader

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 238ae51b6 -> 25bbd3c16


[SPARK-11967][SQL] Consistent use of varargs for multiple paths in 
DataFrameReader

This patch makes it consistent to use varargs in all DataFrameReader methods, 
including Parquet, JSON, text, and the generic load function.

Also added a few more API tests for the Java API.

Author: Reynold Xin 

Closes #9945 from rxin/SPARK-11967.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25bbd3c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25bbd3c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25bbd3c1

Branch: refs/heads/master
Commit: 25bbd3c16e8e8be4d2c43000223d54650e9a3696
Parents: 238ae51
Author: Reynold Xin 
Authored: Tue Nov 24 18:16:07 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 18:16:07 2015 -0800

--
 python/pyspark/sql/readwriter.py| 19 +++
 .../org/apache/spark/sql/DataFrameReader.scala  | 36 
 .../apache/spark/sql/JavaDataFrameSuite.java| 23 +
 sql/core/src/test/resources/text-suite2.txt |  1 +
 .../org/apache/spark/sql/DataFrameSuite.scala   |  2 +-
 5 files changed, 66 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/25bbd3c1/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index e8f0d7e..2e75f0c 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -109,7 +109,7 @@ class DataFrameReader(object):
 def load(self, path=None, format=None, schema=None, **options):
 """Loads data from a data source and returns it as a :class`DataFrame`.
 
-:param path: optional string for file-system backed data sources.
+:param path: optional string or a list of string for file-system 
backed data sources.
 :param format: optional string for format of the data source. Default 
to 'parquet'.
 :param schema: optional :class:`StructType` for the input schema.
 :param options: all other string options
@@ -118,6 +118,7 @@ class DataFrameReader(object):
 ... opt2=1, opt3='str')
 >>> df.dtypes
 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
+
 >>> df = 
sqlContext.read.format('json').load(['python/test_support/sql/people.json',
 ... 'python/test_support/sql/people1.json'])
 >>> df.dtypes
@@ -130,10 +131,8 @@ class DataFrameReader(object):
 self.options(**options)
 if path is not None:
 if type(path) == list:
-paths = path
-gateway = self._sqlContext._sc._gateway
-jpaths = utils.toJArray(gateway, gateway.jvm.java.lang.String, 
paths)
-return self._df(self._jreader.load(jpaths))
+return self._df(
+
self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 else:
 return self._df(self._jreader.load(path))
 else:
@@ -175,6 +174,8 @@ class DataFrameReader(object):
 self.schema(schema)
 if isinstance(path, basestring):
 return self._df(self._jreader.json(path))
+elif type(path) == list:
+return 
self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 elif isinstance(path, RDD):
 return self._df(self._jreader.json(path._jrdd))
 else:
@@ -205,16 +206,20 @@ class DataFrameReader(object):
 
 @ignore_unicode_prefix
 @since(1.6)
-def text(self, path):
+def text(self, paths):
 """Loads a text file and returns a [[DataFrame]] with a single string 
column named "text".
 
 Each line in the text file is a new row in the resulting DataFrame.
 
+:param paths: string, or list of strings, for input path(s).
+
 >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt')
 >>> df.collect()
 [Row(value=u'hello'), Row(value=u'this')]
 """
-return self._df(self._jreader.text(path))
+if isinstance(paths, basestring):
+paths = [paths]
+return 
self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
 
 @since(1.5)
 def orc(self, path):

http://git-wip-us.apache.org/repos/asf/spark/blob/25bbd3c1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index dcb3737..3ed1e55 100644
--- a/sq

spark git commit: [SPARK-11947][SQL] Mark deprecated methods with "This will be removed in Spark 2.0."

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4464fa25c -> 862d788fc


[SPARK-11947][SQL] Mark deprecated methods with "This will be removed in Spark 
2.0."

Also fixed some documentation as I saw them.

Author: Reynold Xin 

Closes #9930 from rxin/SPARK-11947.

(cherry picked from commit 4d6bbbc03ddb6650b00eb638e4876a196014c19c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/862d788f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/862d788f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/862d788f

Branch: refs/heads/branch-1.6
Commit: 862d788fce8c4b91e9f821c192b18fc0310a6b41
Parents: 4464fa2
Author: Reynold Xin 
Authored: Tue Nov 24 18:58:55 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 18:59:01 2015 -0800

--
 project/MimaExcludes.scala  |   3 +-
 .../scala/org/apache/spark/sql/Column.scala |  20 ++--
 .../scala/org/apache/spark/sql/DataFrame.scala  |  72 +++
 .../scala/org/apache/spark/sql/Dataset.scala|   1 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  88 +++---
 .../org/apache/spark/sql/SQLImplicits.scala |  25 +++-
 .../org/apache/spark/sql/SparkSQLParser.scala   | 118 ---
 .../spark/sql/execution/SparkSQLParser.scala| 117 ++
 .../scala/org/apache/spark/sql/functions.scala  |  52 
 .../SimpleTextHadoopFsRelationSuite.scala   |   6 +-
 10 files changed, 282 insertions(+), 220 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/862d788f/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bb45d1b..54a9ad9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -108,7 +108,8 @@ object MimaExcludes {
 ProblemFilters.exclude[MissingClassProblem](
   "org.apache.spark.rdd.MapPartitionsWithPreparationRDD"),
 ProblemFilters.exclude[MissingClassProblem](
-  "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
+  "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSQLParser")
   ) ++ Seq(
 // SPARK-11485
 
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),

http://git-wip-us.apache.org/repos/asf/spark/blob/862d788f/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 30c554a..b3cd9e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -42,7 +42,8 @@ private[sql] object Column {
 
 /**
  * A [[Column]] where an [[Encoder]] has been given for the expected input and 
return type.
- * @since 1.6.0
+ * To create a [[TypedColumn]], use the `as` function on a [[Column]].
+ *
  * @tparam T The input type expected for this expression.  Can be `Any` if the 
expression is type
  *   checked by the analyzer instead of the compiler (i.e. 
`expr("sum(...)")`).
  * @tparam U The output type of this column.
@@ -51,7 +52,8 @@ private[sql] object Column {
  */
 class TypedColumn[-T, U](
 expr: Expression,
-private[sql] val encoder: ExpressionEncoder[U]) extends Column(expr) {
+private[sql] val encoder: ExpressionEncoder[U])
+  extends Column(expr) {
 
   /**
* Inserts the specific input type and schema into any expressions that are 
expected to operate
@@ -61,12 +63,11 @@ class TypedColumn[-T, U](
   inputEncoder: ExpressionEncoder[_],
   schema: Seq[Attribute]): TypedColumn[T, U] = {
 val boundEncoder = 
inputEncoder.bind(schema).asInstanceOf[ExpressionEncoder[Any]]
-new TypedColumn[T, U] (expr transform {
-  case ta: TypedAggregateExpression if ta.aEncoder.isEmpty =>
-ta.copy(
-  aEncoder = Some(boundEncoder),
-  children = schema)
-}, encoder)
+new TypedColumn[T, U](
+  expr transform { case ta: TypedAggregateExpression if 
ta.aEncoder.isEmpty =>
+ta.copy(aEncoder = Some(boundEncoder), children = schema)
+  },
+  encoder)
   }
 }
 
@@ -691,8 +692,9 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
*
* @group expr_ops
* @since 1.3.0
+   * @deprecated As of 1.5.0. Use isin. This will be removed in Spark 2.0.
*/
-  @deprecated("use isin", "1.5.0")
+  @deprecated("use isin. This will be removed in Spark 2.0.", "1.5.0")
   @scala.annotation.varargs
   def in(list: Any*): Column = isin(list :

spark git commit: [SPARK-11947][SQL] Mark deprecated methods with "This will be removed in Spark 2.0."

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 25bbd3c16 -> 4d6bbbc03


[SPARK-11947][SQL] Mark deprecated methods with "This will be removed in Spark 
2.0."

Also fixed some documentation as I saw them.

Author: Reynold Xin 

Closes #9930 from rxin/SPARK-11947.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d6bbbc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d6bbbc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d6bbbc0

Branch: refs/heads/master
Commit: 4d6bbbc03ddb6650b00eb638e4876a196014c19c
Parents: 25bbd3c
Author: Reynold Xin 
Authored: Tue Nov 24 18:58:55 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 18:58:55 2015 -0800

--
 project/MimaExcludes.scala  |   3 +-
 .../scala/org/apache/spark/sql/Column.scala |  20 ++--
 .../scala/org/apache/spark/sql/DataFrame.scala  |  72 +++
 .../scala/org/apache/spark/sql/Dataset.scala|   1 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  88 +++---
 .../org/apache/spark/sql/SQLImplicits.scala |  25 +++-
 .../org/apache/spark/sql/SparkSQLParser.scala   | 118 ---
 .../spark/sql/execution/SparkSQLParser.scala| 117 ++
 .../scala/org/apache/spark/sql/functions.scala  |  52 
 .../SimpleTextHadoopFsRelationSuite.scala   |   6 +-
 10 files changed, 282 insertions(+), 220 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d6bbbc0/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bb45d1b..54a9ad9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -108,7 +108,8 @@ object MimaExcludes {
 ProblemFilters.exclude[MissingClassProblem](
   "org.apache.spark.rdd.MapPartitionsWithPreparationRDD"),
 ProblemFilters.exclude[MissingClassProblem](
-  "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
+  "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSQLParser")
   ) ++ Seq(
 // SPARK-11485
 
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6bbbc0/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 30c554a..b3cd9e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -42,7 +42,8 @@ private[sql] object Column {
 
 /**
  * A [[Column]] where an [[Encoder]] has been given for the expected input and 
return type.
- * @since 1.6.0
+ * To create a [[TypedColumn]], use the `as` function on a [[Column]].
+ *
  * @tparam T The input type expected for this expression.  Can be `Any` if the 
expression is type
  *   checked by the analyzer instead of the compiler (i.e. 
`expr("sum(...)")`).
  * @tparam U The output type of this column.
@@ -51,7 +52,8 @@ private[sql] object Column {
  */
 class TypedColumn[-T, U](
 expr: Expression,
-private[sql] val encoder: ExpressionEncoder[U]) extends Column(expr) {
+private[sql] val encoder: ExpressionEncoder[U])
+  extends Column(expr) {
 
   /**
* Inserts the specific input type and schema into any expressions that are 
expected to operate
@@ -61,12 +63,11 @@ class TypedColumn[-T, U](
   inputEncoder: ExpressionEncoder[_],
   schema: Seq[Attribute]): TypedColumn[T, U] = {
 val boundEncoder = 
inputEncoder.bind(schema).asInstanceOf[ExpressionEncoder[Any]]
-new TypedColumn[T, U] (expr transform {
-  case ta: TypedAggregateExpression if ta.aEncoder.isEmpty =>
-ta.copy(
-  aEncoder = Some(boundEncoder),
-  children = schema)
-}, encoder)
+new TypedColumn[T, U](
+  expr transform { case ta: TypedAggregateExpression if 
ta.aEncoder.isEmpty =>
+ta.copy(aEncoder = Some(boundEncoder), children = schema)
+  },
+  encoder)
   }
 }
 
@@ -691,8 +692,9 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
*
* @group expr_ops
* @since 1.3.0
+   * @deprecated As of 1.5.0. Use isin. This will be removed in Spark 2.0.
*/
-  @deprecated("use isin", "1.5.0")
+  @deprecated("use isin. This will be removed in Spark 2.0.", "1.5.0")
   @scala.annotation.varargs
   def in(list: Any*): Column = isin(list : _*)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6bbbc0/sql/core/src/main/scala/org/apache/spar

spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 4d6bbbc03 -> a5d988763


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz 

Closes #9953 from brkyvz/flaky-ss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5d98876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5d98876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5d98876

Branch: refs/heads/master
Commit: a5d988763319f63a8e2b58673dd4f9098f17c835
Parents: 4d6bbbc
Author: Burak Yavuz 
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 20:58:47 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5d98876/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 862d788fc -> b18112666


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz 

Closes #9953 from brkyvz/flaky-ss.

(cherry picked from commit a5d988763319f63a8e2b58673dd4f9098f17c835)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1811266
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1811266
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1811266

Branch: refs/heads/branch-1.6
Commit: b18112666adec0a942d1cfe8d6b9f1e7c7201fcd
Parents: 862d788
Author: Burak Yavuz 
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 20:59:01 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1811266/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b18112666 -> 486db8789


[SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala

Author: Reynold Xin 

Closes #9948 from rxin/SPARK-10621.

(cherry picked from commit 151d7c2baf18403e6e59e97c80c8bcded6148038)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/486db878
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/486db878
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/486db878

Branch: refs/heads/branch-1.6
Commit: 486db87899bcd3d0ff222c8a31d572fe836b69e7
Parents: b181126
Author: Reynold Xin 
Authored: Tue Nov 24 21:30:53 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 21:30:59 2015 -0800

--
 python/pyspark/sql/functions.py | 111 ++---
 .../scala/org/apache/spark/sql/functions.scala  | 124 +++
 2 files changed, 196 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/486db878/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a1ca723..e3786e0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -150,18 +150,18 @@ _binary_mathfunctions = {
 
 _window_functions = {
 'rowNumber':
-"""returns a sequential number starting at 1 within a window partition.
-
-This is equivalent to the ROW_NUMBER function in SQL.""",
+""".. note:: Deprecated in 1.6, use row_number instead.""",
+'row_number':
+"""returns a sequential number starting at 1 within a window 
partition.""",
 'denseRank':
+""".. note:: Deprecated in 1.6, use dense_rank instead.""",
+'dense_rank':
 """returns the rank of rows within a window partition, without any 
gaps.
 
 The difference between rank and denseRank is that denseRank leaves no 
gaps in ranking
 sequence when there are ties. That is, if you were ranking a 
competition using denseRank
 and had three people tie for second place, you would say that all 
three were in second
-place and that the next person came in third.
-
-This is equivalent to the DENSE_RANK function in SQL.""",
+place and that the next person came in third.""",
 'rank':
 """returns the rank of rows within a window partition.
 
@@ -172,14 +172,14 @@ _window_functions = {
 
 This is equivalent to the RANK function in SQL.""",
 'cumeDist':
+""".. note:: Deprecated in 1.6, use cume_dist instead.""",
+'cume_dist':
 """returns the cumulative distribution of values within a window 
partition,
-i.e. the fraction of rows that are below the current row.
-
-This is equivalent to the CUME_DIST function in SQL.""",
+i.e. the fraction of rows that are below the current row.""",
 'percentRank':
-"""returns the relative rank (i.e. percentile) of rows within a window 
partition.
-
-This is equivalent to the PERCENT_RANK function in SQL.""",
+""".. note:: Deprecated in 1.6, use percent_rank instead.""",
+'percent_rank':
+"""returns the relative rank (i.e. percentile) of rows within a window 
partition.""",
 }
 
 for _name, _doc in _functions.items():
@@ -189,7 +189,7 @@ for _name, _doc in _functions_1_4.items():
 for _name, _doc in _binary_mathfunctions.items():
 globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
 for _name, _doc in _window_functions.items():
-globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
+globals()[_name] = since(1.6)(_create_window_function(_name, _doc))
 for _name, _doc in _functions_1_6.items():
 globals()[_name] = since(1.6)(_create_function(_name, _doc))
 del _name, _doc
@@ -288,6 +288,38 @@ def countDistinct(col, *cols):
 
 @since(1.4)
 def monotonicallyIncreasingId():
+"""
+.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+"""
+return monotonically_increasing_id()
+
+
+@since(1.6)
+def input_file_name():
+"""Creates a string column for the file name of the current Spark task.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.input_file_name())
+
+
+@since(1.6)
+def isnan(col):
+"""An expression that returns true iff the column is NaN.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.isnan(_to_java_column(col)))
+
+
+@since(1.6)
+def isnull(col):
+"""An expression that returns true iff the column is null.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.isnull(_to_java_column(col)))
+
+
+@since(1.6)

spark git commit: [SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a5d988763 -> 151d7c2ba


[SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala

Author: Reynold Xin 

Closes #9948 from rxin/SPARK-10621.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/151d7c2b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/151d7c2b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/151d7c2b

Branch: refs/heads/master
Commit: 151d7c2baf18403e6e59e97c80c8bcded6148038
Parents: a5d9887
Author: Reynold Xin 
Authored: Tue Nov 24 21:30:53 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 21:30:53 2015 -0800

--
 python/pyspark/sql/functions.py | 111 ++---
 .../scala/org/apache/spark/sql/functions.scala  | 124 +++
 2 files changed, 196 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/151d7c2b/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a1ca723..e3786e0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -150,18 +150,18 @@ _binary_mathfunctions = {
 
 _window_functions = {
 'rowNumber':
-"""returns a sequential number starting at 1 within a window partition.
-
-This is equivalent to the ROW_NUMBER function in SQL.""",
+""".. note:: Deprecated in 1.6, use row_number instead.""",
+'row_number':
+"""returns a sequential number starting at 1 within a window 
partition.""",
 'denseRank':
+""".. note:: Deprecated in 1.6, use dense_rank instead.""",
+'dense_rank':
 """returns the rank of rows within a window partition, without any 
gaps.
 
 The difference between rank and denseRank is that denseRank leaves no 
gaps in ranking
 sequence when there are ties. That is, if you were ranking a 
competition using denseRank
 and had three people tie for second place, you would say that all 
three were in second
-place and that the next person came in third.
-
-This is equivalent to the DENSE_RANK function in SQL.""",
+place and that the next person came in third.""",
 'rank':
 """returns the rank of rows within a window partition.
 
@@ -172,14 +172,14 @@ _window_functions = {
 
 This is equivalent to the RANK function in SQL.""",
 'cumeDist':
+""".. note:: Deprecated in 1.6, use cume_dist instead.""",
+'cume_dist':
 """returns the cumulative distribution of values within a window 
partition,
-i.e. the fraction of rows that are below the current row.
-
-This is equivalent to the CUME_DIST function in SQL.""",
+i.e. the fraction of rows that are below the current row.""",
 'percentRank':
-"""returns the relative rank (i.e. percentile) of rows within a window 
partition.
-
-This is equivalent to the PERCENT_RANK function in SQL.""",
+""".. note:: Deprecated in 1.6, use percent_rank instead.""",
+'percent_rank':
+"""returns the relative rank (i.e. percentile) of rows within a window 
partition.""",
 }
 
 for _name, _doc in _functions.items():
@@ -189,7 +189,7 @@ for _name, _doc in _functions_1_4.items():
 for _name, _doc in _binary_mathfunctions.items():
 globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
 for _name, _doc in _window_functions.items():
-globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
+globals()[_name] = since(1.6)(_create_window_function(_name, _doc))
 for _name, _doc in _functions_1_6.items():
 globals()[_name] = since(1.6)(_create_function(_name, _doc))
 del _name, _doc
@@ -288,6 +288,38 @@ def countDistinct(col, *cols):
 
 @since(1.4)
 def monotonicallyIncreasingId():
+"""
+.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+"""
+return monotonically_increasing_id()
+
+
+@since(1.6)
+def input_file_name():
+"""Creates a string column for the file name of the current Spark task.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.input_file_name())
+
+
+@since(1.6)
+def isnan(col):
+"""An expression that returns true iff the column is NaN.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.isnan(_to_java_column(col)))
+
+
+@since(1.6)
+def isnull(col):
+"""An expression that returns true iff the column is null.
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.isnull(_to_java_column(col)))
+
+
+@since(1.6)
+def monotonically_increasing_id():
 """A column that generates monotonically increasing 64-bit integ

spark git commit: [SPARK-11140][CORE] Transfer files using network lib when using NettyRpcEnv - 1.6.version.

2015-11-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 486db8789 -> 68bcb9b33


[SPARK-11140][CORE] Transfer files using network lib when using NettyRpcEnv - 
1.6.version.

This patch is the same code as in SPARK-11140 in master, but with some added 
code to still use the HTTP file server by default in NettyRpcEnv. This is 
mostly to avoid conflicts when backporting patches to 1.6.

Author: Marcelo Vanzin 

Closes #9947 from vanzin/SPARK-11140-branch-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68bcb9b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68bcb9b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68bcb9b3

Branch: refs/heads/branch-1.6
Commit: 68bcb9b33b731af33f6c9444c8c2fc54e50b3202
Parents: 486db87
Author: Marcelo Vanzin 
Authored: Tue Nov 24 21:48:51 2015 -0800
Committer: Reynold Xin 
Committed: Tue Nov 24 21:48:51 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |   8 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |  14 --
 .../scala/org/apache/spark/rpc/RpcEnv.scala |  46 ++
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |  60 +++-
 .../spark/rpc/netty/HttpBasedFileServer.scala   |  59 
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 147 +--
 .../spark/rpc/netty/NettyStreamManager.scala|  63 
 .../scala/org/apache/spark/util/Utils.scala |   9 ++
 .../org/apache/spark/rpc/RpcEnvSuite.scala  |  39 -
 .../spark/rpc/netty/NettyRpcHandlerSuite.scala  |  10 +-
 .../spark/launcher/AbstractCommandBuilder.java  |   2 +-
 .../network/client/TransportClientFactory.java  |   6 +-
 .../network/server/TransportChannelHandler.java |   1 +
 13 files changed, 418 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68bcb9b3/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 90480e5..e19ba11 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1379,7 +1379,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 }
 
 val key = if (!isLocal && scheme == "file") {
-  env.httpFileServer.addFile(new File(uri.getPath))
+  env.rpcEnv.fileServer.addFile(new File(uri.getPath))
 } else {
   schemeCorrectedPath
 }
@@ -1630,7 +1630,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   var key = ""
   if (path.contains("\\")) {
 // For local paths with backslashes on Windows, URI throws an exception
-key = env.httpFileServer.addJar(new File(path))
+key = env.rpcEnv.fileServer.addJar(new File(path))
   } else {
 val uri = new URI(path)
 key = uri.getScheme match {
@@ -1644,7 +1644,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   // of the AM to make it show up in the current working directory.
   val fileName = new Path(uri.getPath).getName()
   try {
-env.httpFileServer.addJar(new File(fileName))
+env.rpcEnv.fileServer.addJar(new File(fileName))
   } catch {
 case e: Exception =>
   // For now just log an error but allow to go through so 
spark examples work.
@@ -1655,7 +1655,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 } else {
   try {
-env.httpFileServer.addJar(new File(uri.getPath))
+env.rpcEnv.fileServer.addJar(new File(uri.getPath))
   } catch {
 case exc: FileNotFoundException =>
   logError(s"Jar not found at $path")

http://git-wip-us.apache.org/repos/asf/spark/blob/68bcb9b3/core/src/main/scala/org/apache/spark/SparkEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 88df27f..84230e3 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -66,7 +66,6 @@ class SparkEnv (
 val blockTransferService: BlockTransferService,
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
-val httpFileServer: HttpFileServer,
 val sparkFilesDir: String,
 val metricsSystem: MetricsSystem,
 val memoryManager: MemoryManager,
@@ -91,7 +90,6 @@ class SparkEnv (
 if (!isStopped) {

spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 151d7c2ba -> 216988688


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21698868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21698868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21698868

Branch: refs/heads/master
Commit: 2169886883d33b33acf378ac42a626576b342df1
Parents: 151d7c2
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:01 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -
 .../apache/spark/streaming/StateMapSuite.scala  | 30 +---
 .../streaming/rdd/TrackStateRDDSuite.scala  | 10 +++
 3 files changed, 42 insertions(+), 17 deletions(-)
--

spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 68bcb9b33 -> 7f030aa42


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.

(cherry picked from commit 2169886883d33b33acf378ac42a626576b342df1)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f030aa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f030aa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f030aa4

Branch: refs/heads/branch-1.6
Commit: 7f030aa422802a8e7077e1c74a59ab9a5fe54488
Parents: 68bcb9b
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:29 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -
 .../apache/spark/streaming/StateMapSuite.scala  | 30 +---
 .../streaming/rdd/TrackStateRDDSuite.scala  | 10 +++
 3 files changed