spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 1df3e8230 -> f5af299ab


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5af299a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5af299a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5af299a

Branch: refs/heads/branch-1.4
Commit: f5af299ab06f654e000c99917c703f989bffaa43
Parents: 1df3e82
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:35:33 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index ca5624a..94d581f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 8c51e6b..1fb6364 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -58,6 +60,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit 

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b1fcefca6 -> 7900d192e


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192

Branch: refs/heads/branch-1.5
Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764
Parents: b1fcefc
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:53 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 06976f8..3159ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {
+  latch.await(10,

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7e7f2627f -> 0df6beccc


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc

Branch: refs/heads/branch-1.6
Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d
Parents: 7e7f262
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:36 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {
+  latch.await(10,

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 068b6438d -> d3ef69332


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933

Branch: refs/heads/master
Commit: d3ef693325f91a1ed340c9756c81244a80398eb2
Parents: 068b643
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:21 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {
+  latch.await(10, TimeUnit.SECONDS)
+}
+  })
+
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber

spark git commit: [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d1930ec01 -> 068b6438d


[SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for

Added Python test cases for the function `isnan`, `isnull`, `nanvl` and 
`json_tuple`.

Fixed a bug in the function `json_tuple`

rxin , could you help me review my changes? Please let me know anything is 
missing.

Thank you! Have a good Thanksgiving day!

Author: gatorsmile 

Closes #9977 from gatorsmile/json_tuple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/068b6438
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/068b6438
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/068b6438

Branch: refs/heads/master
Commit: 068b6438d6886ce5b4aa698383866f466d913d66
Parents: d1930ec
Author: gatorsmile 
Authored: Wed Nov 25 23:24:33 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 23:24:33 2015 -0800

--
 python/pyspark/sql/functions.py | 44 
 1 file changed, 34 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/068b6438/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e3786e0..9062594 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -286,14 +286,6 @@ def countDistinct(col, *cols):
 return Column(jc)
 
 
-@since(1.4)
-def monotonicallyIncreasingId():
-"""
-.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
-"""
-return monotonically_increasing_id()
-
-
 @since(1.6)
 def input_file_name():
 """Creates a string column for the file name of the current Spark task.
@@ -305,6 +297,10 @@ def input_file_name():
 @since(1.6)
 def isnan(col):
 """An expression that returns true iff the column is NaN.
+
+>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 
2.0)], ("a", "b"))
+>>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect()
+[Row(r1=False, r2=False), Row(r1=True, r2=True)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.isnan(_to_java_column(col)))
@@ -313,11 +309,23 @@ def isnan(col):
 @since(1.6)
 def isnull(col):
 """An expression that returns true iff the column is null.
+
+>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b"))
+>>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect()
+[Row(r1=False, r2=False), Row(r1=True, r2=True)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.isnull(_to_java_column(col)))
 
 
+@since(1.4)
+def monotonicallyIncreasingId():
+"""
+.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+"""
+return monotonically_increasing_id()
+
+
 @since(1.6)
 def monotonically_increasing_id():
 """A column that generates monotonically increasing 64-bit integers.
@@ -344,6 +352,10 @@ def nanvl(col1, col2):
 """Returns col1 if it is not NaN, or col2 if col1 is NaN.
 
 Both inputs should be floating point columns (DoubleType or FloatType).
+
+>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 
2.0)], ("a", "b"))
+>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, 
df.b).alias("r2")).collect()
+[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.nanvl(_to_java_column(col1), 
_to_java_column(col2)))
@@ -1460,6 +1472,7 @@ def explode(col):
 return Column(jc)
 
 
+@ignore_unicode_prefix
 @since(1.6)
 def get_json_object(col, path):
 """
@@ -1468,22 +1481,33 @@ def get_json_object(col, path):
 
 :param col: string column in json format
 :param path: path to the json object to extract
+
+>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": 
"value12"}''')]
+>>> df = sqlContext.createDataFrame(data, ("key", "jstring"))
+>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
+  get_json_object(df.jstring, '$.f2').alias("c1") 
).collect()
+[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', 
c1=None)]
 """
 sc = SparkContext._active_spark_context
 jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
 return Column(jc)
 
 
+@ignore_unicode_prefix
 @since(1.6)
-def json_tuple(col, fields):
+def json_tuple(col, *fields):
 """Creates a new row for a json column according to the given field names.
 
 :param col: string column in json format
 :param fields: list of fields to extract
 
+>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''

spark git commit: [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d40bf9ad8 -> 7e7f2627f


[SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for

Added Python test cases for the function `isnan`, `isnull`, `nanvl` and 
`json_tuple`.

Fixed a bug in the function `json_tuple`

rxin , could you help me review my changes? Please let me know anything is 
missing.

Thank you! Have a good Thanksgiving day!

Author: gatorsmile 

Closes #9977 from gatorsmile/json_tuple.

(cherry picked from commit 068b6438d6886ce5b4aa698383866f466d913d66)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e7f2627
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e7f2627
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e7f2627

Branch: refs/heads/branch-1.6
Commit: 7e7f2627f941585a6fb1e086e22d6d1d25b692ab
Parents: d40bf9a
Author: gatorsmile 
Authored: Wed Nov 25 23:24:33 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 23:24:40 2015 -0800

--
 python/pyspark/sql/functions.py | 44 
 1 file changed, 34 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e7f2627/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e3786e0..9062594 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -286,14 +286,6 @@ def countDistinct(col, *cols):
 return Column(jc)
 
 
-@since(1.4)
-def monotonicallyIncreasingId():
-"""
-.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
-"""
-return monotonically_increasing_id()
-
-
 @since(1.6)
 def input_file_name():
 """Creates a string column for the file name of the current Spark task.
@@ -305,6 +297,10 @@ def input_file_name():
 @since(1.6)
 def isnan(col):
 """An expression that returns true iff the column is NaN.
+
+>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 
2.0)], ("a", "b"))
+>>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect()
+[Row(r1=False, r2=False), Row(r1=True, r2=True)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.isnan(_to_java_column(col)))
@@ -313,11 +309,23 @@ def isnan(col):
 @since(1.6)
 def isnull(col):
 """An expression that returns true iff the column is null.
+
+>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b"))
+>>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect()
+[Row(r1=False, r2=False), Row(r1=True, r2=True)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.isnull(_to_java_column(col)))
 
 
+@since(1.4)
+def monotonicallyIncreasingId():
+"""
+.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
+"""
+return monotonically_increasing_id()
+
+
 @since(1.6)
 def monotonically_increasing_id():
 """A column that generates monotonically increasing 64-bit integers.
@@ -344,6 +352,10 @@ def nanvl(col1, col2):
 """Returns col1 if it is not NaN, or col2 if col1 is NaN.
 
 Both inputs should be floating point columns (DoubleType or FloatType).
+
+>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 
2.0)], ("a", "b"))
+>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, 
df.b).alias("r2")).collect()
+[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
 """
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.nanvl(_to_java_column(col1), 
_to_java_column(col2)))
@@ -1460,6 +1472,7 @@ def explode(col):
 return Column(jc)
 
 
+@ignore_unicode_prefix
 @since(1.6)
 def get_json_object(col, path):
 """
@@ -1468,22 +1481,33 @@ def get_json_object(col, path):
 
 :param col: string column in json format
 :param path: path to the json object to extract
+
+>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": 
"value12"}''')]
+>>> df = sqlContext.createDataFrame(data, ("key", "jstring"))
+>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
+  get_json_object(df.jstring, '$.f2').alias("c1") 
).collect()
+[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', 
c1=None)]
 """
 sc = SparkContext._active_spark_context
 jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
 return Column(jc)
 
 
+@ignore_unicode_prefix
 @since(1.6)
-def json_tuple(col, fields):
+def json_tuple(col, *fields):
 """Creates a new row for a json column according to the given field names.
 
 :param col: string column in json format
   

spark git commit: [SPARK-12003] [SQL] remove the prefix for name after expanded star

2015-11-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 399739702 -> d40bf9ad8


[SPARK-12003] [SQL] remove the prefix for name after expanded star

Right now, the expended start will include the name of expression as prefix for 
column, that's not better than without expending, we should not have the prefix.

Author: Davies Liu 

Closes #9984 from davies/expand_star.

(cherry picked from commit d1930ec01ab5a9d83f801f8ae8d4f15a38d98b76)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d40bf9ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d40bf9ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d40bf9ad

Branch: refs/heads/branch-1.6
Commit: d40bf9ad881f4ba6c550cd61acc2e8c29c9dc60f
Parents: 3997397
Author: Davies Liu 
Authored: Wed Nov 25 21:25:20 2015 -0800
Committer: Davies Liu 
Committed: Wed Nov 25 21:25:31 2015 -0800

--
 .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d40bf9ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 1b2a8dc..4f89b46 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -204,7 +204,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
 case s: StructType => s.zipWithIndex.map {
   case (f, i) =>
 val extract = GetStructField(attribute.get, i)
-Alias(extract, target.get + "." + f.name)()
+Alias(extract, f.name)()
 }
 
 case _ => {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12003] [SQL] remove the prefix for name after expanded star

2015-11-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master cc243a079 -> d1930ec01


[SPARK-12003] [SQL] remove the prefix for name after expanded star

Right now, the expended start will include the name of expression as prefix for 
column, that's not better than without expending, we should not have the prefix.

Author: Davies Liu 

Closes #9984 from davies/expand_star.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1930ec0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1930ec0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1930ec0

Branch: refs/heads/master
Commit: d1930ec01ab5a9d83f801f8ae8d4f15a38d98b76
Parents: cc243a0
Author: Davies Liu 
Authored: Wed Nov 25 21:25:20 2015 -0800
Committer: Davies Liu 
Committed: Wed Nov 25 21:25:20 2015 -0800

--
 .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d1930ec0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 1b2a8dc..4f89b46 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -204,7 +204,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
 case s: StructType => s.zipWithIndex.map {
   case (f, i) =>
 val extract = GetStructField(attribute.get, i)
-Alias(extract, target.get + "." + f.name)()
+Alias(extract, f.name)()
 }
 
 case _ => {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11206] Support SQL UI on the history server

2015-11-25 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 21e560641 -> cc243a079


[SPARK-11206] Support SQL UI on the history server

On the live web UI, there is a SQL tab which provides valuable information for 
the SQL query. But once the workload is finished, we won't see the SQL tab on 
the history server. It will be helpful if we support SQL UI on the history 
server so we can analyze it even after its execution.

To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all 
SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and 
`SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4.  A new trait `SparkHistoryListenerFactory` is added to allow the history 
server to feed events to the SQL history listener. The SQL implementation is 
loaded at runtime using `java.util.ServiceLoader`.

Author: Carson Wang 

Closes #9297 from carsonwang/SqlHistoryUI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc243a07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc243a07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc243a07

Branch: refs/heads/master
Commit: cc243a079b1c039d6e7f0b410d1654d94a090e14
Parents: 21e5606
Author: Carson Wang 
Authored: Wed Nov 25 15:13:13 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 15:13:13 2015 -0800

--
 .rat-excludes   |   1 +
 .../org/apache/spark/JavaSparkListener.java |   3 +
 .../org/apache/spark/SparkFirehoseListener.java |   4 +
 .../spark/scheduler/EventLoggingListener.scala  |   4 +
 .../apache/spark/scheduler/SparkListener.scala  |  24 +++-
 .../spark/scheduler/SparkListenerBus.scala  |   1 +
 .../scala/org/apache/spark/ui/SparkUI.scala |  16 ++-
 .../org/apache/spark/util/JsonProtocol.scala|  11 +-
 spark.scheduler.SparkHistoryListenerFactory |   1 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  18 ++-
 .../spark/sql/execution/SQLExecution.scala  |  24 +---
 .../spark/sql/execution/SparkPlanInfo.scala |  46 ++
 .../sql/execution/metric/SQLMetricInfo.scala|  30 
 .../spark/sql/execution/metric/SQLMetrics.scala |  56 +---
 .../spark/sql/execution/ui/ExecutionPage.scala  |   4 +-
 .../spark/sql/execution/ui/SQLListener.scala| 139 +--
 .../apache/spark/sql/execution/ui/SQLTab.scala  |  12 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  20 +--
 .../sql/execution/metric/SQLMetricsSuite.scala  |   4 +-
 .../sql/execution/ui/SQLListenerSuite.scala |  43 +++---
 .../spark/sql/test/SharedSQLContext.scala   |   1 +
 21 files changed, 327 insertions(+), 135 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 08fba6d..7262c96 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,4 +82,5 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/JavaSparkListener.java
--
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java 
b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index fa9acf0..23bc9a2 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
   @Override
   public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
 
+  @Override
+  public void onOtherEvent(SparkListenerEvent event) { }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
--
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java 
b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 1214d05..e6b24af 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener 
{
 onEvent(blockUpdated);
 }
 
+@Override
+public void onOtherEvent(SparkListenerEvent event) {
+onEvent(event);
+}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
---

spark git commit: [SPARK-11983][SQL] remove all unused codegen fallback trait

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master ecac28354 -> 21e560641


[SPARK-11983][SQL] remove all unused codegen fallback trait

Author: Daoyuan Wang 

Closes #9966 from adrian-wang/removeFallback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21e56064
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21e56064
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21e56064

Branch: refs/heads/master
Commit: 21e5606419c4b7462d30580c549e9bfa0123ae23
Parents: ecac283
Author: Daoyuan Wang 
Authored: Wed Nov 25 13:51:30 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 13:51:30 2015 -0800

--
 .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala   | 3 +--
 .../spark/sql/catalyst/expressions/regexpExpressions.scala   | 4 ++--
 .../spark/sql/catalyst/expressions/NonFoldableLiteral.scala  | 3 +--
 3 files changed, 4 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 533d17e..a2c6c39 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -104,8 +104,7 @@ object Cast {
 }
 
 /** Cast the child expression to the target data type. */
-case class Cast(child: Expression, dataType: DataType)
-  extends UnaryExpression with CodegenFallback {
+case class Cast(child: Expression, dataType: DataType) extends UnaryExpression 
{
 
   override def toString: String = s"cast($child as ${dataType.simpleString})"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index 9e484c5..adef605 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -66,7 +66,7 @@ trait StringRegexExpression extends ImplicitCastInputTypes {
  * Simple RegEx pattern matching function
  */
 case class Like(left: Expression, right: Expression)
-  extends BinaryExpression with StringRegexExpression with CodegenFallback {
+  extends BinaryExpression with StringRegexExpression {
 
   override def escape(v: String): String = StringUtils.escapeLikeRegex(v)
 
@@ -117,7 +117,7 @@ case class Like(left: Expression, right: Expression)
 
 
 case class RLike(left: Expression, right: Expression)
-  extends BinaryExpression with StringRegexExpression with CodegenFallback {
+  extends BinaryExpression with StringRegexExpression {
 
   override def escape(v: String): String = v
   override def matches(regex: Pattern, str: String): Boolean = 
regex.matcher(str).find(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
index 31ecf4a..118fd69 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala
@@ -26,8 +26,7 @@ import org.apache.spark.sql.types._
  * A literal value that is not foldable. Used in expression codegen testing to 
test code path
  * that behave differently based on foldable values.
  */
-case class NonFoldableLiteral(value: Any, dataType: DataType)
-  extends LeafExpression with CodegenFallback {
+case class NonFoldableLiteral(value: Any, dataType: DataType) extends 
LeafExpression {
 
   override def foldable: Boolean = false
   override def nullable: Boolean = true


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Fix Aggregator documentation (rename present to finish).

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 cd86d8c74 -> 399739702


Fix Aggregator documentation (rename present to finish).

(cherry picked from commit ecac2835458bbf73fe59413d5bf921500c5b987d)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39973970
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39973970
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39973970

Branch: refs/heads/branch-1.6
Commit: 3997397024975b8affca0d609a231cde5e9959a5
Parents: cd86d8c
Author: Reynold Xin 
Authored: Wed Nov 25 13:45:41 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 13:48:15 2015 -0800

--
 .../main/scala/org/apache/spark/sql/expressions/Aggregator.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39973970/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index b0cd32b..65117d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, 
TypedColumn}
  * def zero: Int = 0
  * def reduce(b: Int, a: Data): Int = b + a.i
  * def merge(b1: Int, b2: Int): Int = b1 + b2
- * def present(r: Int): Int = r
+ * def finish(r: Int): Int = r
  *   }.toColumn()
  *
  *   val ds: Dataset[Data] = ...


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Fix Aggregator documentation (rename present to finish).

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4e81783e9 -> ecac28354


Fix Aggregator documentation (rename present to finish).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecac2835
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecac2835
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecac2835

Branch: refs/heads/master
Commit: ecac2835458bbf73fe59413d5bf921500c5b987d
Parents: 4e81783
Author: Reynold Xin 
Authored: Wed Nov 25 13:45:41 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 13:45:41 2015 -0800

--
 .../main/scala/org/apache/spark/sql/expressions/Aggregator.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ecac2835/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index b0cd32b..65117d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, 
TypedColumn}
  * def zero: Int = 0
  * def reduce(b: Int, a: Data): Int = b + a.i
  * def merge(b1: Int, b2: Int): Int = b1 + b2
- * def present(r: Int): Int = r
+ * def finish(r: Int): Int = r
  *   }.toColumn()
  *
  *   val ds: Dataset[Data] = ...


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.

2015-11-25 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 849ddb6ae -> cd86d8c74


[SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.

This change does a couple of different things to make sure that the RpcEnv-level
code and the network library agree about the status of outstanding RPCs.

For RPCs that do not expect a reply ("RpcEnv.send"), support for one way
messages (hello CORBA!) was added to the network layer. This is a
"fire and forget" message that does not require any state to be kept
by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed
anymore.

For RPCs that do expect a reply ("RpcEnv.ask"), the network library now
returns the internal RPC id; if the RpcEnv layer decides to time out the
RPC before the network layer does, it now asks the TransportClient to
forget about the RPC, so that if the network-level timeout occurs, the
client is not killed.

As part of implementing the above, I cleaned up some of the code in the
netty rpc backend, removing types that were not necessary and factoring
out some common code. Of interest is a slight change in the exceptions
when posting messages to a stopped RpcEnv; that's mostly to avoid nasty
error messages from the local-cluster backend when shutting down, which
pollutes the terminal output.

Author: Marcelo Vanzin 

Closes #9917 from vanzin/SPARK-11866.

(cherry picked from commit 4e81783e92f464d479baaf93eccc3adb1496989a)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd86d8c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd86d8c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd86d8c7

Branch: refs/heads/branch-1.6
Commit: cd86d8c745b745b120ec00cd466ac6cdb05296a6
Parents: 849ddb6
Author: Marcelo Vanzin 
Authored: Wed Nov 25 12:58:18 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 12:58:30 2015 -0800

--
 .../spark/deploy/worker/ExecutorRunner.scala|   6 +-
 .../org/apache/spark/rpc/netty/Dispatcher.scala |  55 +++
 .../org/apache/spark/rpc/netty/Inbox.scala  |  28 ++--
 .../spark/rpc/netty/NettyRpcCallContext.scala   |  35 +
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 153 ---
 .../org/apache/spark/rpc/netty/Outbox.scala |  64 ++--
 .../org/apache/spark/rpc/netty/InboxSuite.scala |   6 +-
 .../spark/rpc/netty/NettyRpcHandlerSuite.scala  |   2 +-
 .../spark/network/client/TransportClient.java   |  34 -
 .../apache/spark/network/protocol/Message.java  |   4 +-
 .../spark/network/protocol/MessageDecoder.java  |   3 +
 .../spark/network/protocol/OneWayMessage.java   |  75 +
 .../spark/network/sasl/SaslRpcHandler.java  |   5 +
 .../apache/spark/network/server/RpcHandler.java |  36 +
 .../network/server/TransportRequestHandler.java |  18 ++-
 .../org/apache/spark/network/ProtocolSuite.java |   2 +
 .../spark/network/RpcIntegrationSuite.java  |  31 
 .../spark/network/sasl/SparkSaslSuite.java  |   9 ++
 18 files changed, 374 insertions(+), 192 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cd86d8c7/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 3aef051..25a1747 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -92,7 +92,11 @@ private[deploy] class ExecutorRunner(
   process.destroy()
   exitCode = Some(process.waitFor())
 }
-worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode))
+try {
+  worker.send(ExecutorStateChanged(appId, execId, state, message, 
exitCode))
+} catch {
+  case e: IllegalStateException => logWarning(e.getMessage(), e)
+}
   }
 
   /** Stop this executor runner, including killing the process it launched */

http://git-wip-us.apache.org/repos/asf/spark/blob/cd86d8c7/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index eb25d6c..533c984 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -106,44 +106,30 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 val iter = endpoints.keySet().iterator()
 while (iter.hasNext) {
   val name = iter.next
-  post

spark git commit: [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.

2015-11-25 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master d29e2ef4c -> 4e81783e9


[SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.

This change does a couple of different things to make sure that the RpcEnv-level
code and the network library agree about the status of outstanding RPCs.

For RPCs that do not expect a reply ("RpcEnv.send"), support for one way
messages (hello CORBA!) was added to the network layer. This is a
"fire and forget" message that does not require any state to be kept
by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed
anymore.

For RPCs that do expect a reply ("RpcEnv.ask"), the network library now
returns the internal RPC id; if the RpcEnv layer decides to time out the
RPC before the network layer does, it now asks the TransportClient to
forget about the RPC, so that if the network-level timeout occurs, the
client is not killed.

As part of implementing the above, I cleaned up some of the code in the
netty rpc backend, removing types that were not necessary and factoring
out some common code. Of interest is a slight change in the exceptions
when posting messages to a stopped RpcEnv; that's mostly to avoid nasty
error messages from the local-cluster backend when shutting down, which
pollutes the terminal output.

Author: Marcelo Vanzin 

Closes #9917 from vanzin/SPARK-11866.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e81783e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e81783e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e81783e

Branch: refs/heads/master
Commit: 4e81783e92f464d479baaf93eccc3adb1496989a
Parents: d29e2ef
Author: Marcelo Vanzin 
Authored: Wed Nov 25 12:58:18 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 12:58:18 2015 -0800

--
 .../spark/deploy/worker/ExecutorRunner.scala|   6 +-
 .../org/apache/spark/rpc/netty/Dispatcher.scala |  55 +++
 .../org/apache/spark/rpc/netty/Inbox.scala  |  28 ++--
 .../spark/rpc/netty/NettyRpcCallContext.scala   |  35 +
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 153 ---
 .../org/apache/spark/rpc/netty/Outbox.scala |  64 ++--
 .../org/apache/spark/rpc/netty/InboxSuite.scala |   6 +-
 .../spark/rpc/netty/NettyRpcHandlerSuite.scala  |   2 +-
 .../spark/network/client/TransportClient.java   |  34 -
 .../apache/spark/network/protocol/Message.java  |   4 +-
 .../spark/network/protocol/MessageDecoder.java  |   3 +
 .../spark/network/protocol/OneWayMessage.java   |  75 +
 .../spark/network/sasl/SaslRpcHandler.java  |   5 +
 .../apache/spark/network/server/RpcHandler.java |  36 +
 .../network/server/TransportRequestHandler.java |  18 ++-
 .../org/apache/spark/network/ProtocolSuite.java |   2 +
 .../spark/network/RpcIntegrationSuite.java  |  31 
 .../spark/network/sasl/SparkSaslSuite.java  |   9 ++
 18 files changed, 374 insertions(+), 192 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e81783e/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 3aef051..25a1747 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -92,7 +92,11 @@ private[deploy] class ExecutorRunner(
   process.destroy()
   exitCode = Some(process.waitFor())
 }
-worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode))
+try {
+  worker.send(ExecutorStateChanged(appId, execId, state, message, 
exitCode))
+} catch {
+  case e: IllegalStateException => logWarning(e.getMessage(), e)
+}
   }
 
   /** Stop this executor runner, including killing the process it launched */

http://git-wip-us.apache.org/repos/asf/spark/blob/4e81783e/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index eb25d6c..533c984 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -106,44 +106,30 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 val iter = endpoints.keySet().iterator()
 while (iter.hasNext) {
   val name = iter.next
-  postMessage(
-name,
-_ => message,
-() => { logWarning(s"Drop $message because $name has 

spark git commit: [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java

2015-11-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 88875d941 -> d29e2ef4c


[SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and 
TransformFunctionSerializer to Java

The Python exception track in TransformFunction and TransformFunctionSerializer 
is not sent back to Java. Py4j just throws a very general exception, which is 
hard to debug.

This PRs adds `getFailure` method to get the failure message in Java side.

Author: Shixiong Zhu 

Closes #9922 from zsxwing/SPARK-11935.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d29e2ef4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d29e2ef4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d29e2ef4

Branch: refs/heads/master
Commit: d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0
Parents: 88875d9
Author: Shixiong Zhu 
Authored: Wed Nov 25 11:47:21 2015 -0800
Committer: Tathagata Das 
Committed: Wed Nov 25 11:47:21 2015 -0800

--
 python/pyspark/streaming/tests.py   | 82 +++-
 python/pyspark/streaming/util.py| 29 ---
 .../streaming/api/python/PythonDStream.scala| 52 ++---
 3 files changed, 144 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d29e2ef4/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index a0e0267..d380d69 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -404,17 +404,69 @@ class BasicOperationTests(PySparkStreamingTestCase):
 self._test_func(input, func, expected)
 
 def test_failed_func(self):
+# Test failure in
+# TransformFunction.apply(rdd: Option[RDD[_]], time: Time)
 input = [self.sc.parallelize([d], 1) for d in range(4)]
 input_stream = self.ssc.queueStream(input)
 
 def failed_func(i):
-raise ValueError("failed")
+raise ValueError("This is a special error")
 
 input_stream.map(failed_func).pprint()
 self.ssc.start()
 try:
 self.ssc.awaitTerminationOrTimeout(10)
 except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
+return
+
+self.fail("a failed func should throw an error")
+
+def test_failed_func2(self):
+# Test failure in
+# TransformFunction.apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], 
time: Time)
+input = [self.sc.parallelize([d], 1) for d in range(4)]
+input_stream1 = self.ssc.queueStream(input)
+input_stream2 = self.ssc.queueStream(input)
+
+def failed_func(rdd1, rdd2):
+raise ValueError("This is a special error")
+
+input_stream1.transformWith(failed_func, input_stream2, True).pprint()
+self.ssc.start()
+try:
+self.ssc.awaitTerminationOrTimeout(10)
+except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
+return
+
+self.fail("a failed func should throw an error")
+
+def test_failed_func_with_reseting_failure(self):
+input = [self.sc.parallelize([d], 1) for d in range(4)]
+input_stream = self.ssc.queueStream(input)
+
+def failed_func(i):
+if i == 1:
+# Make it fail in the second batch
+raise ValueError("This is a special error")
+else:
+return i
+
+# We should be able to see the results of the 3rd and 4th batches even 
if the second batch
+# fails
+expected = [[0], [2], [3]]
+self.assertEqual(expected, 
self._collect(input_stream.map(failed_func), 3))
+try:
+self.ssc.awaitTerminationOrTimeout(10)
+except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
 return
 
 self.fail("a failed func should throw an error")
@@ -780,6 +832,34 @@ class CheckpointTests(unittest.TestCase):
 if self.cpd is not None:
 shutil.rmtree(self.cpd)
 
+def test_transform_function_serializer_failure(self):
+inputd = tempfile.mkdtemp()
+self.cpd = 
tempfile.mkdtemp("test_transform_function_serializer_failure")
+
+def setup():
+conf = SparkConf().set("spark.default.parallelism", 1)
+sc = SparkContext(conf=conf)
+ssc = StreamingContext(sc, 0.5)
+
+# A function that cannot be serialized
+def process(time, rdd):
+

spark git commit: [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java

2015-11-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b4cf318ab -> 849ddb6ae


[SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and 
TransformFunctionSerializer to Java

The Python exception track in TransformFunction and TransformFunctionSerializer 
is not sent back to Java. Py4j just throws a very general exception, which is 
hard to debug.

This PRs adds `getFailure` method to get the failure message in Java side.

Author: Shixiong Zhu 

Closes #9922 from zsxwing/SPARK-11935.

(cherry picked from commit d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849ddb6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849ddb6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849ddb6a

Branch: refs/heads/branch-1.6
Commit: 849ddb6ae69416434173824d50d59ebc8b4dbbf5
Parents: b4cf318
Author: Shixiong Zhu 
Authored: Wed Nov 25 11:47:21 2015 -0800
Committer: Tathagata Das 
Committed: Wed Nov 25 11:47:33 2015 -0800

--
 python/pyspark/streaming/tests.py   | 82 +++-
 python/pyspark/streaming/util.py| 29 ---
 .../streaming/api/python/PythonDStream.scala| 52 ++---
 3 files changed, 144 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/849ddb6a/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index a0e0267..d380d69 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -404,17 +404,69 @@ class BasicOperationTests(PySparkStreamingTestCase):
 self._test_func(input, func, expected)
 
 def test_failed_func(self):
+# Test failure in
+# TransformFunction.apply(rdd: Option[RDD[_]], time: Time)
 input = [self.sc.parallelize([d], 1) for d in range(4)]
 input_stream = self.ssc.queueStream(input)
 
 def failed_func(i):
-raise ValueError("failed")
+raise ValueError("This is a special error")
 
 input_stream.map(failed_func).pprint()
 self.ssc.start()
 try:
 self.ssc.awaitTerminationOrTimeout(10)
 except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
+return
+
+self.fail("a failed func should throw an error")
+
+def test_failed_func2(self):
+# Test failure in
+# TransformFunction.apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], 
time: Time)
+input = [self.sc.parallelize([d], 1) for d in range(4)]
+input_stream1 = self.ssc.queueStream(input)
+input_stream2 = self.ssc.queueStream(input)
+
+def failed_func(rdd1, rdd2):
+raise ValueError("This is a special error")
+
+input_stream1.transformWith(failed_func, input_stream2, True).pprint()
+self.ssc.start()
+try:
+self.ssc.awaitTerminationOrTimeout(10)
+except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
+return
+
+self.fail("a failed func should throw an error")
+
+def test_failed_func_with_reseting_failure(self):
+input = [self.sc.parallelize([d], 1) for d in range(4)]
+input_stream = self.ssc.queueStream(input)
+
+def failed_func(i):
+if i == 1:
+# Make it fail in the second batch
+raise ValueError("This is a special error")
+else:
+return i
+
+# We should be able to see the results of the 3rd and 4th batches even 
if the second batch
+# fails
+expected = [[0], [2], [3]]
+self.assertEqual(expected, 
self._collect(input_stream.map(failed_func), 3))
+try:
+self.ssc.awaitTerminationOrTimeout(10)
+except:
+import traceback
+failure = traceback.format_exc()
+self.assertTrue("This is a special error" in failure)
 return
 
 self.fail("a failed func should throw an error")
@@ -780,6 +832,34 @@ class CheckpointTests(unittest.TestCase):
 if self.cpd is not None:
 shutil.rmtree(self.cpd)
 
+def test_transform_function_serializer_failure(self):
+inputd = tempfile.mkdtemp()
+self.cpd = 
tempfile.mkdtemp("test_transform_function_serializer_failure")
+
+def setup():
+conf = SparkConf().set("spark.default.parallelism", 1)
+sc = SparkContext(conf=conf)
+ssc = StreamingContext(s

spark git commit: [SPARK-10558][CORE] Fix wrong executor state in Master

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 9f3e59a16 -> 88875d941


[SPARK-10558][CORE] Fix wrong executor state in Master

`ExecutorAdded` can only be sent to `AppClient` when worker report back the 
executor state as `LOADING`, otherwise because of concurrency issue, 
`AppClient` will possibly receive `ExectuorAdded` at first, then 
`ExecutorStateUpdated` with `LOADING` state.

Also Master will change the executor state from `LAUNCHING` to `RUNNING` 
(`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker 
report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> 
`RUNNING`.

Also it is wrongly shown in master UI, the state of executor should be 
`RUNNING` rather than `LOADING`:

![screen shot 2015-09-11 at 2 30 28 
pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)

Author: jerryshao 

Closes #8714 from jerryshao/SPARK-10558.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88875d94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88875d94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88875d94

Branch: refs/heads/master
Commit: 88875d9413ec7d64a88d40857ffcf97b5853a7f2
Parents: 9f3e59a
Author: jerryshao 
Authored: Wed Nov 25 11:42:53 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:42:53 2015 -0800

--
 .../scala/org/apache/spark/deploy/ExecutorState.scala |  2 +-
 .../org/apache/spark/deploy/client/AppClient.scala|  3 ---
 .../scala/org/apache/spark/deploy/master/Master.scala | 14 +++---
 .../scala/org/apache/spark/deploy/worker/Worker.scala |  2 +-
 4 files changed, 13 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c6..69c98e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 

http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362..df6ba7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
 val fullId = appId + "/" + id
 logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, 
workerId, hostPort,
   cores))
-// FIXME if changing master and `ExecutorAdded` happen at the same 
time (the order is not
-// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
-sendToMaster(ExecutorStateChanged(appId.get, id, 
ExecutorState.RUNNING, None, None))
 listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
   case ExecutorUpdated(id, state, message, exitStatus) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487..9952c97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
   execOption match {
 case Some(exec) => {
   val appInfo = idToApp(appId)
+  val oldState = exec.state
   exec.state = state
-  if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+  if (state == ExecutorState.RUNNING) {
+assert(oldState == ExecutorState.LAUNCHING,
+  s"executor $execId state transfer from $oldState to RUNNING is 
illegal")
+appInfo.resetRetryCount()
+  }
+
   exec.application.driver.send(ExecutorUpdated(execId, state, message, 
exitStatus))
+
   if (ExecutorState.

spark git commit: [SPARK-10558][CORE] Fix wrong executor state in Master

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7b720bf1c -> b4cf318ab


[SPARK-10558][CORE] Fix wrong executor state in Master

`ExecutorAdded` can only be sent to `AppClient` when worker report back the 
executor state as `LOADING`, otherwise because of concurrency issue, 
`AppClient` will possibly receive `ExectuorAdded` at first, then 
`ExecutorStateUpdated` with `LOADING` state.

Also Master will change the executor state from `LAUNCHING` to `RUNNING` 
(`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker 
report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> 
`RUNNING`.

Also it is wrongly shown in master UI, the state of executor should be 
`RUNNING` rather than `LOADING`:

![screen shot 2015-09-11 at 2 30 28 
pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)

Author: jerryshao 

Closes #8714 from jerryshao/SPARK-10558.

(cherry picked from commit 88875d9413ec7d64a88d40857ffcf97b5853a7f2)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4cf318a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4cf318a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4cf318a

Branch: refs/heads/branch-1.6
Commit: b4cf318ab9e75f9fb8e4e15f60d263889189b445
Parents: 7b720bf
Author: jerryshao 
Authored: Wed Nov 25 11:42:53 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:43:00 2015 -0800

--
 .../scala/org/apache/spark/deploy/ExecutorState.scala |  2 +-
 .../org/apache/spark/deploy/client/AppClient.scala|  3 ---
 .../scala/org/apache/spark/deploy/master/Master.scala | 14 +++---
 .../scala/org/apache/spark/deploy/worker/Worker.scala |  2 +-
 4 files changed, 13 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c6..69c98e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362..df6ba7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
 val fullId = appId + "/" + id
 logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, 
workerId, hostPort,
   cores))
-// FIXME if changing master and `ExecutorAdded` happen at the same 
time (the order is not
-// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
-sendToMaster(ExecutorStateChanged(appId.get, id, 
ExecutorState.RUNNING, None, None))
 listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
   case ExecutorUpdated(id, state, message, exitStatus) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487..9952c97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
   execOption match {
 case Some(exec) => {
   val appInfo = idToApp(appId)
+  val oldState = exec.state
   exec.state = state
-  if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+  if (state == ExecutorState.RUNNING) {
+assert(oldState == ExecutorState.LAUNCHING,
+  s"executor $execId state transfer from $oldState to RUNNING is 
illegal")
+appInfo.resetRetryCount()
+  }
+
   exec.appli

spark git commit: [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 83653ac5e -> 9f3e59a16


[SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd 
from wrong directory

* On windows the `bin/load-spark-env.cmd` tries to load `spark-env.cmd` from 
`%~dp0..\..\conf`, where `~dp0` points to `bin` and `conf` is only one level up.
* Updated `bin/load-spark-env.cmd` to load `spark-env.cmd` from `%~dp0..\conf`, 
instead of `%~dp0..\..\conf`

Author: wangt 

Closes #9863 from toddwan/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f3e59a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f3e59a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f3e59a1

Branch: refs/heads/master
Commit: 9f3e59a16822fb61d60cf103bd4f7823552939c6
Parents: 83653ac
Author: wangt 
Authored: Wed Nov 25 11:41:05 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:41:05 2015 -0800

--
 bin/load-spark-env.cmd | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9f3e59a1/bin/load-spark-env.cmd
--
diff --git a/bin/load-spark-env.cmd b/bin/load-spark-env.cmd
index 36d932c..59080ed 100644
--- a/bin/load-spark-env.cmd
+++ b/bin/load-spark-env.cmd
@@ -27,7 +27,7 @@ if [%SPARK_ENV_LOADED%] == [] (
   if not [%SPARK_CONF_DIR%] == [] (
 set user_conf_dir=%SPARK_CONF_DIR%
   ) else (
-set user_conf_dir=%~dp0..\..\conf
+set user_conf_dir=%~dp0..\conf
   )
 
   call :LoadSparkEnv


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 97317d346 -> 7b720bf1c


[SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd 
from wrong directory

* On windows the `bin/load-spark-env.cmd` tries to load `spark-env.cmd` from 
`%~dp0..\..\conf`, where `~dp0` points to `bin` and `conf` is only one level up.
* Updated `bin/load-spark-env.cmd` to load `spark-env.cmd` from `%~dp0..\conf`, 
instead of `%~dp0..\..\conf`

Author: wangt 

Closes #9863 from toddwan/master.

(cherry picked from commit 9f3e59a16822fb61d60cf103bd4f7823552939c6)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b720bf1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b720bf1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b720bf1

Branch: refs/heads/branch-1.6
Commit: 7b720bf1c4860fdb560540744c5fdfb333b752bc
Parents: 97317d3
Author: wangt 
Authored: Wed Nov 25 11:41:05 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:41:12 2015 -0800

--
 bin/load-spark-env.cmd | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7b720bf1/bin/load-spark-env.cmd
--
diff --git a/bin/load-spark-env.cmd b/bin/load-spark-env.cmd
index 36d932c..59080ed 100644
--- a/bin/load-spark-env.cmd
+++ b/bin/load-spark-env.cmd
@@ -27,7 +27,7 @@ if [%SPARK_ENV_LOADED%] == [] (
   if not [%SPARK_CONF_DIR%] == [] (
 set user_conf_dir=%SPARK_CONF_DIR%
   ) else (
-set user_conf_dir=%~dp0..\..\conf
+set user_conf_dir=%~dp0..\conf
   )
 
   call :LoadSparkEnv


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10864][WEB UI] app name is hidden if window is resized

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 67b673208 -> 83653ac5e


[SPARK-10864][WEB UI] app name is hidden if window is resized

Currently the Web UI navbar has a minimum width of 1200px; so if a window is 
resized smaller than that the app name goes off screen. The 1200px width seems 
to have been chosen since it fits the longest example app name without wrapping.

To work with smaller window widths I made the tabs wrap since it looked better 
than wrapping the app name. This is a distinct change in how the navbar looks 
and I'm not sure if it's what we actually want to do.

Other notes:
- min-width set to 600px to keep the tabs from wrapping individually (will need 
to be adjusted if tabs are added)
- app name will also wrap (making three levels) if a really really long app 
name is used

Author: Alex Bozarth 

Closes #9874 from ajbozarth/spark10864.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83653ac5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83653ac5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83653ac5

Branch: refs/heads/master
Commit: 83653ac5e71996c5a366a42170bed316b208f1b5
Parents: 67b6732
Author: Alex Bozarth 
Authored: Wed Nov 25 11:39:00 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:39:00 2015 -0800

--
 core/src/main/resources/org/apache/spark/ui/static/webui.css | 8 ++--
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 2 +-
 2 files changed, 3 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/83653ac5/core/src/main/resources/org/apache/spark/ui/static/webui.css
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css 
b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 04f3070..c628a0c 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -16,14 +16,9 @@
  */
 
 .navbar {
-  height: 50px;
   font-size: 15px;
   margin-bottom: 15px;
-  min-width: 1200px
-}
-
-.navbar .navbar-inner {
-  height: 50px;
+  min-width: 600px;
 }
 
 .navbar .brand {
@@ -46,6 +41,7 @@
 .navbar-text {
   height: 50px;
   line-height: 3.3;
+  white-space: nowrap;
 }
 
 table.sortable thead {

http://git-wip-us.apache.org/repos/asf/spark/blob/83653ac5/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 84a1116..1e8194f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -210,10 +210,10 @@ private[spark] object UIUtils extends Logging {
 {org.apache.spark.SPARK_VERSION}
   
 
-{header}
 
   {shortAppName} application UI
 
+{header}
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10864][WEB UI] app name is hidden if window is resized

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 448208d0e -> 97317d346


[SPARK-10864][WEB UI] app name is hidden if window is resized

Currently the Web UI navbar has a minimum width of 1200px; so if a window is 
resized smaller than that the app name goes off screen. The 1200px width seems 
to have been chosen since it fits the longest example app name without wrapping.

To work with smaller window widths I made the tabs wrap since it looked better 
than wrapping the app name. This is a distinct change in how the navbar looks 
and I'm not sure if it's what we actually want to do.

Other notes:
- min-width set to 600px to keep the tabs from wrapping individually (will need 
to be adjusted if tabs are added)
- app name will also wrap (making three levels) if a really really long app 
name is used

Author: Alex Bozarth 

Closes #9874 from ajbozarth/spark10864.

(cherry picked from commit 83653ac5e71996c5a366a42170bed316b208f1b5)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97317d34
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97317d34
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97317d34

Branch: refs/heads/branch-1.6
Commit: 97317d346e9cab6b2f33123a79f33ccd253a99fd
Parents: 448208d
Author: Alex Bozarth 
Authored: Wed Nov 25 11:39:00 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:39:11 2015 -0800

--
 core/src/main/resources/org/apache/spark/ui/static/webui.css | 8 ++--
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 2 +-
 2 files changed, 3 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97317d34/core/src/main/resources/org/apache/spark/ui/static/webui.css
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css 
b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 04f3070..c628a0c 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -16,14 +16,9 @@
  */
 
 .navbar {
-  height: 50px;
   font-size: 15px;
   margin-bottom: 15px;
-  min-width: 1200px
-}
-
-.navbar .navbar-inner {
-  height: 50px;
+  min-width: 600px;
 }
 
 .navbar .brand {
@@ -46,6 +41,7 @@
 .navbar-text {
   height: 50px;
   line-height: 3.3;
+  white-space: nowrap;
 }
 
 table.sortable thead {

http://git-wip-us.apache.org/repos/asf/spark/blob/97317d34/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 84a1116..1e8194f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -210,10 +210,10 @@ private[spark] object UIUtils extends Logging {
 {org.apache.spark.SPARK_VERSION}
   
 
-{header}
 
   {shortAppName} application UI
 
+{header}
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCUMENTATION] Fix minor doc error

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 0dee44a66 -> 67b673208


[DOCUMENTATION] Fix minor doc error

Author: Jeff Zhang 

Closes #9956 from zjffdu/dev_typo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67b67320
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67b67320
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67b67320

Branch: refs/heads/master
Commit: 67b67320884282ccf3102e2af96f877e9b186517
Parents: 0dee44a
Author: Jeff Zhang 
Authored: Wed Nov 25 11:37:42 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:37:42 2015 -0800

--
 docs/configuration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/67b67320/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index 4de202d..741d6b2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -35,7 +35,7 @@ val sc = new SparkContext(conf)
 {% endhighlight %}
 
 Note that we can have more than 1 thread in local mode, and in cases like 
Spark Streaming, we may
-actually require one to prevent any sort of starvation issues.
+actually require more than 1 thread to prevent any sort of starvation issues.
 
 Properties that specify some time duration should be configured with a unit of 
time.
 The following format is accepted:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCUMENTATION] Fix minor doc error

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 685b9c2f5 -> 448208d0e


[DOCUMENTATION] Fix minor doc error

Author: Jeff Zhang 

Closes #9956 from zjffdu/dev_typo.

(cherry picked from commit 67b67320884282ccf3102e2af96f877e9b186517)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/448208d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/448208d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/448208d0

Branch: refs/heads/branch-1.6
Commit: 448208d0e86963b7f3ee0e507967512240546871
Parents: 685b9c2
Author: Jeff Zhang 
Authored: Wed Nov 25 11:37:42 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:38:08 2015 -0800

--
 docs/configuration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/448208d0/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index c496146..77f1006 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -35,7 +35,7 @@ val sc = new SparkContext(conf)
 {% endhighlight %}
 
 Note that we can have more than 1 thread in local mode, and in cases like 
Spark Streaming, we may
-actually require one to prevent any sort of starvation issues.
+actually require more than 1 thread to prevent any sort of starvation issues.
 
 Properties that specify some time duration should be configured with a unit of 
time.
 The following format is accepted:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Remove unnecessary spaces in `include_example.rb`

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c7db01b20 -> 685b9c2f5


[MINOR] Remove unnecessary spaces in `include_example.rb`

Author: Yu ISHIKAWA 

Closes #9960 from yu-iskw/minor-remove-spaces.

(cherry picked from commit 0dee44a6646daae0cc03dbc32125e080dff0f4ae)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/685b9c2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/685b9c2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/685b9c2f

Branch: refs/heads/branch-1.6
Commit: 685b9c2f5a68cc77e9494a39b0cb25b6c0b239b3
Parents: c7db01b
Author: Yu ISHIKAWA 
Authored: Wed Nov 25 11:35:52 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:36:06 2015 -0800

--
 docs/_plugins/include_example.rb | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/685b9c2f/docs/_plugins/include_example.rb
--
diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb
index 549f81f..564c866 100644
--- a/docs/_plugins/include_example.rb
+++ b/docs/_plugins/include_example.rb
@@ -20,12 +20,12 @@ require 'pygments'
 
 module Jekyll
   class IncludeExampleTag < Liquid::Tag
-
+
 def initialize(tag_name, markup, tokens)
   @markup = markup
   super
 end
- 
+
 def render(context)
   site = context.registers[:site]
   config_dir = '../examples/src/main'
@@ -37,7 +37,7 @@ module Jekyll
 
   code = File.open(@file).read.encode("UTF-8")
   code = select_lines(code)
- 
+
   rendered_code = Pygments.highlight(code, :lexer => @lang)
 
   hint = "Find full example code at " \
@@ -45,7 +45,7 @@ module Jekyll
 
   rendered_code + hint
 end
- 
+
 # Trim the code block so as to have the same indention, regardless of 
their positions in the
 # code file.
 def trim_codeblock(lines)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Remove unnecessary spaces in `include_example.rb`

2015-11-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master dc1d324fd -> 0dee44a66


[MINOR] Remove unnecessary spaces in `include_example.rb`

Author: Yu ISHIKAWA 

Closes #9960 from yu-iskw/minor-remove-spaces.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dee44a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dee44a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dee44a6

Branch: refs/heads/master
Commit: 0dee44a6646daae0cc03dbc32125e080dff0f4ae
Parents: dc1d324
Author: Yu ISHIKAWA 
Authored: Wed Nov 25 11:35:52 2015 -0800
Committer: Andrew Or 
Committed: Wed Nov 25 11:35:52 2015 -0800

--
 docs/_plugins/include_example.rb | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0dee44a6/docs/_plugins/include_example.rb
--
diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb
index 549f81f..564c866 100644
--- a/docs/_plugins/include_example.rb
+++ b/docs/_plugins/include_example.rb
@@ -20,12 +20,12 @@ require 'pygments'
 
 module Jekyll
   class IncludeExampleTag < Liquid::Tag
-
+
 def initialize(tag_name, markup, tokens)
   @markup = markup
   super
 end
- 
+
 def render(context)
   site = context.registers[:site]
   config_dir = '../examples/src/main'
@@ -37,7 +37,7 @@ module Jekyll
 
   code = File.open(@file).read.encode("UTF-8")
   code = select_lines(code)
- 
+
   rendered_code = Pygments.highlight(code, :lexer => @lang)
 
   hint = "Find full example code at " \
@@ -45,7 +45,7 @@ module Jekyll
 
   rendered_code + hint
 end
- 
+
 # Trim the code block so as to have the same indention, regardless of 
their positions in the
 # code file.
 def trim_codeblock(lines)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark

2015-11-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d5145210b -> c7db01b20


[SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark

Currently, we does not have visualization for SQL query from Python, this PR 
fix that.

cc zsxwing

Author: Davies Liu 

Closes #9949 from davies/pyspark_sql_ui.

(cherry picked from commit dc1d324fdf83e9f4b1debfb277533b002691d71f)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7db01b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7db01b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7db01b2

Branch: refs/heads/branch-1.6
Commit: c7db01b20b97352697ffb04aee869bea44d8f932
Parents: d514521
Author: Davies Liu 
Authored: Wed Nov 25 11:11:39 2015 -0800
Committer: Davies Liu 
Committed: Wed Nov 25 11:11:51 2015 -0800

--
 python/pyspark/sql/dataframe.py |  2 +-
 .../src/main/scala/org/apache/spark/sql/DataFrame.scala |  7 +++
 .../scala/org/apache/spark/sql/execution/python.scala   | 12 +++-
 3 files changed, 15 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0dd75ba..746bb55 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -277,7 +277,7 @@ class DataFrame(object):
 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
 """
 with SCCallSiteSync(self._sc) as css:
-port = 
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
+port = self._jdf.collectToPython()
 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
 
 @ignore_unicode_prefix

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d8319b9..6197f10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.python.PythonRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
@@ -1735,6 +1736,12 @@ class DataFrame private[sql](
 EvaluatePython.javaToPython(rdd)
   }
 
+  protected[sql] def collectToPython(): Int = {
+withNewExecutionId {
+  PythonRDD.collectAndServe(javaToPython.rdd)
+}
+  }
+
   
   
   // Deprecated methods

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index d611b00..defcec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -121,11 +121,13 @@ object EvaluatePython {
 
   def takeAndServe(df: DataFrame, n: Int): Int = {
 registerPicklers()
-val iter = new SerDeUtil.AutoBatchedPickler(
-  df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
-EvaluatePython.toJava(row, df.schema)
-  })
-PythonRDD.serveIterator(iter, s"serve-DataFrame")
+df.withNewExecutionId {
+  val iter = new SerDeUtil.AutoBatchedPickler(
+df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
+  EvaluatePython.toJava(row, df.schema)
+})
+  PythonRDD.serveIterator(iter, s"serve-DataFrame")
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark

2015-11-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 6b781576a -> dc1d324fd


[SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark

Currently, we does not have visualization for SQL query from Python, this PR 
fix that.

cc zsxwing

Author: Davies Liu 

Closes #9949 from davies/pyspark_sql_ui.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc1d324f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc1d324f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc1d324f

Branch: refs/heads/master
Commit: dc1d324fdf83e9f4b1debfb277533b002691d71f
Parents: 6b78157
Author: Davies Liu 
Authored: Wed Nov 25 11:11:39 2015 -0800
Committer: Davies Liu 
Committed: Wed Nov 25 11:11:39 2015 -0800

--
 python/pyspark/sql/dataframe.py |  2 +-
 .../src/main/scala/org/apache/spark/sql/DataFrame.scala |  7 +++
 .../scala/org/apache/spark/sql/execution/python.scala   | 12 +++-
 3 files changed, 15 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0dd75ba..746bb55 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -277,7 +277,7 @@ class DataFrame(object):
 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
 """
 with SCCallSiteSync(self._sc) as css:
-port = 
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
+port = self._jdf.collectToPython()
 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
 
 @ignore_unicode_prefix

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d8319b9..6197f10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.python.PythonRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
@@ -1735,6 +1736,12 @@ class DataFrame private[sql](
 EvaluatePython.javaToPython(rdd)
   }
 
+  protected[sql] def collectToPython(): Int = {
+withNewExecutionId {
+  PythonRDD.collectAndServe(javaToPython.rdd)
+}
+  }
+
   
   
   // Deprecated methods

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index d611b00..defcec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -121,11 +121,13 @@ object EvaluatePython {
 
   def takeAndServe(df: DataFrame, n: Int): Int = {
 registerPicklers()
-val iter = new SerDeUtil.AutoBatchedPickler(
-  df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
-EvaluatePython.toJava(row, df.schema)
-  })
-PythonRDD.serveIterator(iter, s"serve-DataFrame")
+df.withNewExecutionId {
+  val iter = new SerDeUtil.AutoBatchedPickler(
+df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
+  EvaluatePython.toJava(row, df.schema)
+})
+  PythonRDD.serveIterator(iter, s"serve-DataFrame")
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 204f3601d -> d5145210b


[SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

deleting the temp dir like that

```

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9)
a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98)

scala> a.foreach(x => {a.remove(x) })

scala> a.foreach(println(_))
98
```

You may not modify a collection while traversing or iterating over it.This can 
not delete all element of the collection

Author: Zhongshuai Pei 

Closes #9951 from DoingDone9/Bug_RemainDir.

(cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5145210
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5145210
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5145210

Branch: refs/heads/branch-1.6
Commit: d5145210bd59072a33b61f15348d5e794f6df4e0
Parents: 204f360
Author: Zhongshuai Pei 
Authored: Wed Nov 25 10:37:34 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:37:42 2015 -0800

--
 .../main/scala/org/apache/spark/util/ShutdownHookManager.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5145210/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index db4a8b3..4012dca 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
   // Add a shutdown hook to delete the temp dirs when the JVM exits
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
 logInfo("Shutdown hook called")
-shutdownDeletePaths.foreach { dirPath =>
+// we need to materialize the paths to delete because deleteRecursively 
removes items from
+// shutdownDeletePaths as we are traversing through it.
+shutdownDeletePaths.toArray.foreach { dirPath =>
   try {
 logInfo("Deleting directory " + dirPath)
 Utils.deleteRecursively(new File(dirPath))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 94789f374 -> 1df3e8230


[SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

deleting the temp dir like that

```

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9)
a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98)

scala> a.foreach(x => {a.remove(x) })

scala> a.foreach(println(_))
98
```

You may not modify a collection while traversing or iterating over it.This can 
not delete all element of the collection

Author: Zhongshuai Pei 

Closes #9951 from DoingDone9/Bug_RemainDir.

(cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1df3e823
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1df3e823
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1df3e823

Branch: refs/heads/branch-1.4
Commit: 1df3e823028c0f1e22e2e61f959ef0343355a757
Parents: 94789f3
Author: Zhongshuai Pei 
Authored: Wed Nov 25 10:37:34 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:38:02 2015 -0800

--
 .../main/scala/org/apache/spark/util/ShutdownHookManager.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1df3e823/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b8..091dc03 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
   // Add a shutdown hook to delete the temp dirs when the JVM exits
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
 logInfo("Shutdown hook called")
-shutdownDeletePaths.foreach { dirPath =>
+// we need to materialize the paths to delete because deleteRecursively 
removes items from
+// shutdownDeletePaths as we are traversing through it.
+shutdownDeletePaths.toArray.foreach { dirPath =>
   try {
 logInfo("Deleting directory " + dirPath)
 Utils.deleteRecursively(new File(dirPath))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4139a4ed1 -> b1fcefca6


[SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

deleting the temp dir like that

```

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9)
a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98)

scala> a.foreach(x => {a.remove(x) })

scala> a.foreach(println(_))
98
```

You may not modify a collection while traversing or iterating over it.This can 
not delete all element of the collection

Author: Zhongshuai Pei 

Closes #9951 from DoingDone9/Bug_RemainDir.

(cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1fcefca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1fcefca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1fcefca

Branch: refs/heads/branch-1.5
Commit: b1fcefca6b257f5bb1be85fce2b4a97d4271e0aa
Parents: 4139a4e
Author: Zhongshuai Pei 
Authored: Wed Nov 25 10:37:34 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:37:51 2015 -0800

--
 .../main/scala/org/apache/spark/util/ShutdownHookManager.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1fcefca/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b8..091dc03 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
   // Add a shutdown hook to delete the temp dirs when the JVM exits
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
 logInfo("Shutdown hook called")
-shutdownDeletePaths.foreach { dirPath =>
+// we need to materialize the paths to delete because deleteRecursively 
removes items from
+// shutdownDeletePaths as we are traversing through it.
+shutdownDeletePaths.toArray.foreach { dirPath =>
   try {
 logInfo("Deleting directory " + dirPath)
 Utils.deleteRecursively(new File(dirPath))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master faabdfa2b -> 6b781576a


[SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits

deleting the temp dir like that

```

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9)
a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98)

scala> a.foreach(x => {a.remove(x) })

scala> a.foreach(println(_))
98
```

You may not modify a collection while traversing or iterating over it.This can 
not delete all element of the collection

Author: Zhongshuai Pei 

Closes #9951 from DoingDone9/Bug_RemainDir.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b781576
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b781576
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b781576

Branch: refs/heads/master
Commit: 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac
Parents: faabdfa
Author: Zhongshuai Pei 
Authored: Wed Nov 25 10:37:34 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:37:34 2015 -0800

--
 .../main/scala/org/apache/spark/util/ShutdownHookManager.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6b781576/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index db4a8b3..4012dca 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
   // Add a shutdown hook to delete the temp dirs when the JVM exits
   addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
 logInfo("Shutdown hook called")
-shutdownDeletePaths.foreach { dirPath =>
+// we need to materialize the paths to delete because deleteRecursively 
removes items from
+// shutdownDeletePaths as we are traversing through it.
+shutdownDeletePaths.toArray.foreach { dirPath =>
   try {
 logInfo("Deleting directory " + dirPath)
 Utils.deleteRecursively(new File(dirPath))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master c1f85fc71 -> faabdfa2b


[SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python

Author: felixcheung 

Closes #9967 from felixcheung/pypivotdoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/faabdfa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/faabdfa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/faabdfa2

Branch: refs/heads/master
Commit: faabdfa2bd416ae514961535f1953e8e9e8b1f3f
Parents: c1f85fc
Author: felixcheung 
Authored: Wed Nov 25 10:36:35 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:36:35 2015 -0800

--
 python/pyspark/sql/group.py| 6 +++---
 sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/faabdfa2/python/pyspark/sql/group.py
--
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index d8ed7eb..1911588 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -169,11 +169,11 @@ class GroupedData(object):
 
 @since(1.6)
 def pivot(self, pivot_col, values=None):
-"""Pivots a column of the current DataFrame and preform the specified 
aggregation.
+"""Pivots a column of the current DataFrame and perform the specified 
aggregation.
 
 :param pivot_col: Column to pivot
-:param values: Optional list of values of pivotColumn that will be 
translated to columns in
-the output data frame. If values are not provided the method with 
do an immediate call
+:param values: Optional list of values of pivot column that will be 
translated to columns in
+the output DataFrame. If values are not provided the method will 
do an immediate call
 to .distinct() on the pivot column.
 
 >>> df4.groupBy("year").pivot("course", ["dotNET", 
"Java"]).sum("earnings").collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/faabdfa2/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index abd531c..13341a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -282,7 +282,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.
@@ -321,7 +321,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.
@@ -353,7 +353,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 400f66f7c -> 204f3601d


[SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python

Author: felixcheung 

Closes #9967 from felixcheung/pypivotdoc.

(cherry picked from commit faabdfa2bd416ae514961535f1953e8e9e8b1f3f)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/204f3601
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/204f3601
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/204f3601

Branch: refs/heads/branch-1.6
Commit: 204f3601d81502fcd879cfd23376475443eedf30
Parents: 400f66f
Author: felixcheung 
Authored: Wed Nov 25 10:36:35 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 10:36:59 2015 -0800

--
 python/pyspark/sql/group.py| 6 +++---
 sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/204f3601/python/pyspark/sql/group.py
--
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index d8ed7eb..1911588 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -169,11 +169,11 @@ class GroupedData(object):
 
 @since(1.6)
 def pivot(self, pivot_col, values=None):
-"""Pivots a column of the current DataFrame and preform the specified 
aggregation.
+"""Pivots a column of the current DataFrame and perform the specified 
aggregation.
 
 :param pivot_col: Column to pivot
-:param values: Optional list of values of pivotColumn that will be 
translated to columns in
-the output data frame. If values are not provided the method with 
do an immediate call
+:param values: Optional list of values of pivot column that will be 
translated to columns in
+the output DataFrame. If values are not provided the method will 
do an immediate call
 to .distinct() on the pivot column.
 
 >>> df4.groupBy("year").pivot("course", ["dotNET", 
"Java"]).sum("earnings").collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/204f3601/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index abd531c..13341a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -282,7 +282,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.
@@ -321,7 +321,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.
@@ -353,7 +353,7 @@ class GroupedData protected[sql](
   }
 
   /**
-   * Pivots a column of the current [[DataFrame]] and preform the specified 
aggregation.
+   * Pivots a column of the current [[DataFrame]] and perform the specified 
aggregation.
* There are two versions of pivot function: one that requires the caller to 
specify the list
* of distinct values to pivot on, and one that does not. The latter is more 
concise but less
* efficient, because Spark needs to first compute the list of distinct 
values internally.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: [SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.

2015-11-25 Thread vanzin
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.

- NettyRpcEnv::openStream() now correctly propagates errors to
  the read side of the pipe.
- NettyStreamManager now throws if the file being transferred does
  not exist.
- The network library now correctly handles zero-sized streams.

Author: Marcelo Vanzin 

Closes #9941 from vanzin/SPARK-11956.

(cherry picked from commit c1f85fc71e71e07534b89c84677d977bb20994f8)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400f66f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400f66f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400f66f7

Branch: refs/heads/branch-1.6
Commit: 400f66f7c43f608444fb87505e1a789879928360
Parents: c8f26d2
Author: Marcelo Vanzin 
Authored: Wed Nov 25 09:47:20 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 10:00:50 2015 -0800

--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 19 +
 .../spark/rpc/netty/NettyStreamManager.scala|  2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 27 ++-
 .../client/TransportResponseHandler.java| 28 +---
 .../org/apache/spark/network/StreamSuite.java   | 23 +++-
 5 files changed, 75 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 7495f3e..98ce3d6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -27,7 +27,7 @@ import javax.annotation.Nullable
 
 import scala.concurrent.{Future, Promise}
 import scala.reflect.ClassTag
-import scala.util.{DynamicVariable, Failure, Success}
+import scala.util.{DynamicVariable, Failure, Success, Try}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{Logging, HttpFileServer, SecurityManager, SparkConf}
@@ -375,13 +375,22 @@ private[netty] class NettyRpcEnv(
 
 @volatile private var error: Throwable = _
 
-def setError(e: Throwable): Unit = error = e
+def setError(e: Throwable): Unit = {
+  error = e
+  source.close()
+}
 
 override def read(dst: ByteBuffer): Int = {
-  if (error != null) {
-throw error
+  val result = if (error == null) {
+Try(source.read(dst))
+  } else {
+Failure(error)
+  }
+
+  result match {
+case Success(bytesRead) => bytesRead
+case Failure(error) => throw error
   }
-  source.read(dst)
 }
 
 override def close(): Unit = source.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index eb1d260..a2768b4 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   case _ => throw new IllegalArgumentException(s"Invalid file type: 
$ftype")
 }
 
-require(file != null, s"File not found: $streamId")
+require(file != null && file.isFile(), s"File not found: $streamId")
 new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 2b664c6..6cc958a 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 val tempDir = Utils.createTempDir()
 val file = new File(tempDir, "file")
 Files.write(UUID.randomUUID().toString(), file, UTF_8)
+val empty = new File(tempDir, "empty")
+Files.write("", empty, UTF_8);
 val jar = new File(tempDir, "jar")
 Files.write(UUID.randomUUID().toString(), jar, UTF_8)
 
 val fileUri = env.fileServer.addFile(file)
+val emptyUri = env.fileServer.addFile(empty)
 val jarUri = env.fileServer.addJa

[1/2] spark git commit: [SPARK-11762][NETWORK] Account for active streams when couting outstanding requests.

2015-11-25 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 2aeee5696 -> 400f66f7c


[SPARK-11762][NETWORK] Account for active streams when couting outstanding 
requests.

This way the timeout handling code can correctly close "hung" channels that are
processing streams.

Author: Marcelo Vanzin 

Closes #9747 from vanzin/SPARK-11762.

(cherry picked from commit 5231cd5acaae69d735ba3209531705cc222f3cfb)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8f26d2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8f26d2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8f26d2e

Branch: refs/heads/branch-1.6
Commit: c8f26d2e818f935add8b34ef58de7c7cac2a60af
Parents: 2aeee56
Author: Marcelo Vanzin 
Authored: Mon Nov 23 10:45:23 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 10:00:45 2015 -0800

--
 .../spark/network/client/StreamInterceptor.java | 12 -
 .../client/TransportResponseHandler.java| 15 +--
 .../network/TransportResponseHandlerSuite.java  | 27 
 3 files changed, 51 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8f26d2e/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
 
b/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
index 02230a0..88ba3cc 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
@@ -30,13 +30,19 @@ import org.apache.spark.network.util.TransportFrameDecoder;
  */
 class StreamInterceptor implements TransportFrameDecoder.Interceptor {
 
+  private final TransportResponseHandler handler;
   private final String streamId;
   private final long byteCount;
   private final StreamCallback callback;
 
   private volatile long bytesRead;
 
-  StreamInterceptor(String streamId, long byteCount, StreamCallback callback) {
+  StreamInterceptor(
+  TransportResponseHandler handler,
+  String streamId,
+  long byteCount,
+  StreamCallback callback) {
+this.handler = handler;
 this.streamId = streamId;
 this.byteCount = byteCount;
 this.callback = callback;
@@ -45,11 +51,13 @@ class StreamInterceptor implements 
TransportFrameDecoder.Interceptor {
 
   @Override
   public void exceptionCaught(Throwable cause) throws Exception {
+handler.deactivateStream();
 callback.onFailure(streamId, cause);
   }
 
   @Override
   public void channelInactive() throws Exception {
+handler.deactivateStream();
 callback.onFailure(streamId, new ClosedChannelException());
   }
 
@@ -65,8 +73,10 @@ class StreamInterceptor implements 
TransportFrameDecoder.Interceptor {
   RuntimeException re = new IllegalStateException(String.format(
 "Read too many bytes? Expected %d, but read %d.", byteCount, 
bytesRead));
   callback.onFailure(streamId, re);
+  handler.deactivateStream();
   throw re;
 } else if (bytesRead == byteCount) {
+  handler.deactivateStream();
   callback.onComplete(streamId);
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c8f26d2e/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index fc7bdde..be181e0 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ public class TransportResponseHandler extends 
MessageHandler {
   private final Map outstandingRpcs;
 
   private final Queue streamCallbacks;
+  private volatile boolean streamActive;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC 
request was sent. */
   private final AtomicLong timeOfLastRequestNs;
@@ -87,9 +89,15 @@ public class TransportResponseHandler extends 
MessageHandler {
   }
 
   public void addStreamCallback(StreamCallback callback) {
+timeOfLastRequestNs.s

spark git commit: [SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.

2015-11-25 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 0a5aef753 -> c1f85fc71


[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.

- NettyRpcEnv::openStream() now correctly propagates errors to
  the read side of the pipe.
- NettyStreamManager now throws if the file being transferred does
  not exist.
- The network library now correctly handles zero-sized streams.

Author: Marcelo Vanzin 

Closes #9941 from vanzin/SPARK-11956.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1f85fc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1f85fc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1f85fc7

Branch: refs/heads/master
Commit: c1f85fc71e71e07534b89c84677d977bb20994f8
Parents: 0a5aef7
Author: Marcelo Vanzin 
Authored: Wed Nov 25 09:47:20 2015 -0800
Committer: Marcelo Vanzin 
Committed: Wed Nov 25 09:47:20 2015 -0800

--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 19 +
 .../spark/rpc/netty/NettyStreamManager.scala|  2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 27 ++-
 .../client/TransportResponseHandler.java| 28 +---
 .../org/apache/spark/network/StreamSuite.java   | 23 +++-
 5 files changed, 75 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68701f6..c8fa870 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -27,7 +27,7 @@ import javax.annotation.Nullable
 
 import scala.concurrent.{Future, Promise}
 import scala.reflect.ClassTag
-import scala.util.{DynamicVariable, Failure, Success}
+import scala.util.{DynamicVariable, Failure, Success, Try}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -368,13 +368,22 @@ private[netty] class NettyRpcEnv(
 
 @volatile private var error: Throwable = _
 
-def setError(e: Throwable): Unit = error = e
+def setError(e: Throwable): Unit = {
+  error = e
+  source.close()
+}
 
 override def read(dst: ByteBuffer): Int = {
-  if (error != null) {
-throw error
+  val result = if (error == null) {
+Try(source.read(dst))
+  } else {
+Failure(error)
+  }
+
+  result match {
+case Success(bytesRead) => bytesRead
+case Failure(error) => throw error
   }
-  source.read(dst)
 }
 
 override def close(): Unit = source.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index eb1d260..a2768b4 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   case _ => throw new IllegalArgumentException(s"Invalid file type: 
$ftype")
 }
 
-require(file != null, s"File not found: $streamId")
+require(file != null && file.isFile(), s"File not found: $streamId")
 new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 2b664c6..6cc958a 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 val tempDir = Utils.createTempDir()
 val file = new File(tempDir, "file")
 Files.write(UUID.randomUUID().toString(), file, UTF_8)
+val empty = new File(tempDir, "empty")
+Files.write("", empty, UTF_8);
 val jar = new File(tempDir, "jar")
 Files.write(UUID.randomUUID().toString(), jar, UTF_8)
 
 val fileUri = env.fileServer.addFile(file)
+val emptyUri = env.fileServer.addFile(empty)
 val jarUri = env.fileServer.addJar(jar)
 

spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage

2015-11-25 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 27b5f31a0 -> 4139a4ed1


[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a 
Stage

This issue was addressed in https://github.com/apache/spark/pull/5494, but the 
fix in that PR, while safe in the sense that it will prevent the SparkContext 
from shutting down, misses the actual bug.  The intent of `submitMissingTasks` 
should be understood as "submit the Tasks that are missing for the Stage, and 
run them as part of the ActiveJob identified by jobId".  Because of a 
long-standing bug, the `jobId` parameter was never being used.  Instead, we 
were trying to use the jobId with which the Stage was created -- which may no 
longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId 
parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed 
since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra 
Author: Imran Rashid 

Closes #6291 from markhamstra/SPARK-6880.

(cherry picked from commit 0a5aef753e70e93d7e56054f354a52e4d4e18932)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4139a4ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4139a4ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4139a4ed

Branch: refs/heads/branch-1.5
Commit: 4139a4ed1f024b1a88e4efe613acf45175f82318
Parents: 27b5f31
Author: Mark Hamstra 
Authored: Wed Nov 25 09:34:34 2015 -0600
Committer: Imran Rashid 
Committed: Wed Nov 25 09:38:34 2015 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++-
 2 files changed, 109 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4139a4ed/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index cb2494d..c8f996c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -810,7 +810,9 @@ class DAGScheduler(
   stage.resetInternalAccumulators()
 }
 
-val properties = 
jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+// Use the scheduling pool, job group, description, etc. from an ActiveJob 
associated
+// with this Stage
+val properties = jobIdToActiveJob(jobId).properties
 
 runningStages += stage
 // SparkListenerStageSubmitted should be posted before testing whether 
tasks are
@@ -905,7 +907,7 @@ class DAGScheduler(
   stage.pendingTasks ++= tasks
   logDebug("New pending tasks: " + stage.pendingTasks)
   taskScheduler.submitTasks(new TaskSet(
-tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, 
properties))
+tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
   stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark

http://git-wip-us.apache.org/repos/asf/spark/blob/4139a4ed/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7232970..88745f2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Properties
+
 import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -234,9 +236,10 @@ class DAGSchedulerSuite
   rdd: RDD[_],
   partitions: Array[Int],
   func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
-  listener: JobListener = jobListener): Int = {
+  listener: JobListener = jobListener,
+  properties: Properties = null): Int = {
 val jobId = scheduler.nextJobId.getAndIncrement()
-runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener))
+runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener, properties))
 jobId
   }
 
@@ -750,6 +753,106 @@ class DAGSchedulerSuite
 assertDataStructuresEmpty()
   }
 

spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage

2015-11-25 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4971eaaa5 -> 2aeee5696


[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a 
Stage

This issue was addressed in https://github.com/apache/spark/pull/5494, but the 
fix in that PR, while safe in the sense that it will prevent the SparkContext 
from shutting down, misses the actual bug.  The intent of `submitMissingTasks` 
should be understood as "submit the Tasks that are missing for the Stage, and 
run them as part of the ActiveJob identified by jobId".  Because of a 
long-standing bug, the `jobId` parameter was never being used.  Instead, we 
were trying to use the jobId with which the Stage was created -- which may no 
longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId 
parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed 
since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra 
Author: Imran Rashid 

Closes #6291 from markhamstra/SPARK-6880.

(cherry picked from commit 0a5aef753e70e93d7e56054f354a52e4d4e18932)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aeee569
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aeee569
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aeee569

Branch: refs/heads/branch-1.6
Commit: 2aeee56961b27984fa43253bd63ac94b06162bfb
Parents: 4971eaa
Author: Mark Hamstra 
Authored: Wed Nov 25 09:34:34 2015 -0600
Committer: Imran Rashid 
Committed: Wed Nov 25 09:34:56 2015 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++-
 2 files changed, 109 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2aeee569/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77a184d..e01a960 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,9 @@ class DAGScheduler(
   stage.resetInternalAccumulators()
 }
 
-val properties = 
jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+// Use the scheduling pool, job group, description, etc. from an ActiveJob 
associated
+// with this Stage
+val properties = jobIdToActiveJob(jobId).properties
 
 runningStages += stage
 // SparkListenerStageSubmitted should be posted before testing whether 
tasks are
@@ -1047,7 +1049,7 @@ class DAGScheduler(
   stage.pendingPartitions ++= tasks.map(_.partitionId)
   logDebug("New pending partitions: " + stage.pendingPartitions)
   taskScheduler.submitTasks(new TaskSet(
-tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, 
properties))
+tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
   stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark

http://git-wip-us.apache.org/repos/asf/spark/blob/2aeee569/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4d6b254..653d41f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Properties
+
 import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -262,9 +264,10 @@ class DAGSchedulerSuite
   rdd: RDD[_],
   partitions: Array[Int],
   func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
-  listener: JobListener = jobListener): Int = {
+  listener: JobListener = jobListener,
+  properties: Properties = null): Int = {
 val jobId = scheduler.nextJobId.getAndIncrement()
-runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener))
+runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener, properties))
 jobId
   }
 
@@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
 

spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage

2015-11-25 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master b9b6fbe89 -> 0a5aef753


[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a 
Stage

This issue was addressed in https://github.com/apache/spark/pull/5494, but the 
fix in that PR, while safe in the sense that it will prevent the SparkContext 
from shutting down, misses the actual bug.  The intent of `submitMissingTasks` 
should be understood as "submit the Tasks that are missing for the Stage, and 
run them as part of the ActiveJob identified by jobId".  Because of a 
long-standing bug, the `jobId` parameter was never being used.  Instead, we 
were trying to use the jobId with which the Stage was created -- which may no 
longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId 
parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed 
since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra 
Author: Imran Rashid 

Closes #6291 from markhamstra/SPARK-6880.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a5aef75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a5aef75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a5aef75

Branch: refs/heads/master
Commit: 0a5aef753e70e93d7e56054f354a52e4d4e18932
Parents: b9b6fbe
Author: Mark Hamstra 
Authored: Wed Nov 25 09:34:34 2015 -0600
Committer: Imran Rashid 
Committed: Wed Nov 25 09:34:34 2015 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++-
 2 files changed, 109 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77a184d..e01a960 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,9 @@ class DAGScheduler(
   stage.resetInternalAccumulators()
 }
 
-val properties = 
jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+// Use the scheduling pool, job group, description, etc. from an ActiveJob 
associated
+// with this Stage
+val properties = jobIdToActiveJob(jobId).properties
 
 runningStages += stage
 // SparkListenerStageSubmitted should be posted before testing whether 
tasks are
@@ -1047,7 +1049,7 @@ class DAGScheduler(
   stage.pendingPartitions ++= tasks.map(_.partitionId)
   logDebug("New pending partitions: " + stage.pendingPartitions)
   taskScheduler.submitTasks(new TaskSet(
-tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, 
properties))
+tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
   stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark

http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4d6b254..653d41f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Properties
+
 import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -262,9 +264,10 @@ class DAGSchedulerSuite
   rdd: RDD[_],
   partitions: Array[Int],
   func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
-  listener: JobListener = jobListener): Int = {
+  listener: JobListener = jobListener,
+  properties: Properties = null): Int = {
 val jobId = scheduler.nextJobId.getAndIncrement()
-runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener))
+runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener, properties))
 jobId
   }
 
@@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
 assertDataStructuresEmpty()
   }
 
+  def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: Str

spark git commit: [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …

2015-11-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 638500265 -> b9b6fbe89


[SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …

…for registerFunction [Python]

Straightforward change on the python doc

Author: Jeff Zhang 

Closes #9901 from zjffdu/SPARK-11860.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9b6fbe8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9b6fbe8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9b6fbe8

Branch: refs/heads/master
Commit: b9b6fbe89b6d1a890faa02c1a53bb670a6255362
Parents: 6385002
Author: Jeff Zhang 
Authored: Wed Nov 25 13:49:58 2015 +
Committer: Sean Owen 
Committed: Wed Nov 25 13:49:58 2015 +

--
 python/pyspark/sql/context.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b9b6fbe8/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 5a85ac3..a49c1b5 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -195,14 +195,15 @@ class SQLContext(object):
 @ignore_unicode_prefix
 @since(1.2)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a lambda function as a UDF so it can be used in SQL 
statements.
+"""Registers a python function (including lambda function) as a UDF
+so it can be used in SQL statements.
 
 In addition to a name and the function itself, the return type can be 
optionally specified.
 When the return type is not given it default to a string and 
conversion will automatically
 be done.  For any other return type, the produced object must match 
the specified type.
 
 :param name: name of the UDF
-:param samplingRatio: lambda function
+:param f: python function
 :param returnType: a :class:`DataType` object
 
 >>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …

2015-11-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a986a3bde -> 4971eaaa5


[SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …

…for registerFunction [Python]

Straightforward change on the python doc

Author: Jeff Zhang 

Closes #9901 from zjffdu/SPARK-11860.

(cherry picked from commit b9b6fbe89b6d1a890faa02c1a53bb670a6255362)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4971eaaa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4971eaaa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4971eaaa

Branch: refs/heads/branch-1.6
Commit: 4971eaaa5768ad20c5a77278e435c97409a9ca8f
Parents: a986a3b
Author: Jeff Zhang 
Authored: Wed Nov 25 13:49:58 2015 +
Committer: Sean Owen 
Committed: Wed Nov 25 13:50:10 2015 +

--
 python/pyspark/sql/context.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4971eaaa/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 5a85ac3..a49c1b5 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -195,14 +195,15 @@ class SQLContext(object):
 @ignore_unicode_prefix
 @since(1.2)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a lambda function as a UDF so it can be used in SQL 
statements.
+"""Registers a python function (including lambda function) as a UDF
+so it can be used in SQL statements.
 
 In addition to a name and the function itself, the return type can be 
optionally specified.
 When the return type is not given it default to a string and 
conversion will automatically
 be done.  For any other return type, the produced object must match 
the specified type.
 
 :param name: name of the UDF
-:param samplingRatio: lambda function
+:param f: python function
 :param returnType: a :class:`DataType` object
 
 >>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set

2015-11-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 997896643 -> a986a3bde


[SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to 
spark.dynamicAllocation.enabled and spark.executor.instances both set

Changed the log type to a 'warning' instead of 'info' as required.

Author: Ashwin Swaroop 

Closes #9926 from ashwinswaroop/master.

(cherry picked from commit 63850026576b3ea7783f9d4b975171dc3cff6e4c)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a986a3bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a986a3bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a986a3bd

Branch: refs/heads/branch-1.6
Commit: a986a3bde7426fbbc6b95848dcc55f1d6f22a1b1
Parents: 9978966
Author: Ashwin Swaroop 
Authored: Wed Nov 25 13:41:14 2015 +
Committer: Sean Owen 
Committed: Wed Nov 25 13:41:26 2015 +

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a986a3bd/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e19ba11..2c10779 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -556,7 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
 // Optionally scale number of executors dynamically based on workload. 
Exposed for testing.
 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
 if (!dynamicAllocationEnabled && 
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
-  logInfo("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
+  logWarning("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
 }
 
 _executorAllocationManager =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set

2015-11-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a0f1a1183 -> 638500265


[SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to 
spark.dynamicAllocation.enabled and spark.executor.instances both set

Changed the log type to a 'warning' instead of 'info' as required.

Author: Ashwin Swaroop 

Closes #9926 from ashwinswaroop/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63850026
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63850026
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63850026

Branch: refs/heads/master
Commit: 63850026576b3ea7783f9d4b975171dc3cff6e4c
Parents: a0f1a11
Author: Ashwin Swaroop 
Authored: Wed Nov 25 13:41:14 2015 +
Committer: Sean Owen 
Committed: Wed Nov 25 13:41:14 2015 +

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/63850026/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e19ba11..2c10779 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -556,7 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
 // Optionally scale number of executors dynamically based on workload. 
Exposed for testing.
 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
 if (!dynamicAllocationEnabled && 
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
-  logInfo("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
+  logWarning("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
 }
 
 _executorAllocationManager =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 2610e0612 -> a0f1a1183


[SPARK-11981][SQL] Move implementations of methods back to DataFrame from 
Queryable

Also added show methods to Dataset.

Author: Reynold Xin 

Closes #9964 from rxin/SPARK-11981.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0f1a118
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0f1a118
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0f1a118

Branch: refs/heads/master
Commit: a0f1a11837bfffb76582499d36fbaf21a1d628cb
Parents: 2610e06
Author: Reynold Xin 
Authored: Wed Nov 25 01:03:18 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 01:03:18 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 35 -
 .../scala/org/apache/spark/sql/Dataset.scala| 77 +++-
 .../apache/spark/sql/execution/Queryable.scala  | 32 ++--
 3 files changed, 111 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a0f1a118/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5eca1db..d8319b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, FileRelation, 
LogicalRDD, QueryExecution, Queryable, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -112,8 +112,8 @@ private[sql] object DataFrame {
  */
 @Experimental
 class DataFrame private[sql](
-@transient val sqlContext: SQLContext,
-@DeveloperApi @transient val queryExecution: QueryExecution)
+@transient override val sqlContext: SQLContext,
+@DeveloperApi @transient override val queryExecution: QueryExecution)
   extends Queryable with Serializable {
 
   // Note for Spark contributors: if adding or updating any action in 
`DataFrame`, please make sure
@@ -283,6 +283,35 @@ class DataFrame private[sql](
   def schema: StructType = queryExecution.analyzed.schema
 
   /**
+   * Prints the schema to the console in a nice tree format.
+   * @group basic
+   * @since 1.3.0
+   */
+  // scalastyle:off println
+  override def printSchema(): Unit = println(schema.treeString)
+  // scalastyle:on println
+
+  /**
+   * Prints the plans (logical and physical) to the console for debugging 
purposes.
+   * @group basic
+   * @since 1.3.0
+   */
+  override def explain(extended: Boolean): Unit = {
+val explain = ExplainCommand(queryExecution.logical, extended = extended)
+sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
+  // scalastyle:off println
+  r => println(r.getString(0))
+  // scalastyle:on println
+}
+  }
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 1.3.0
+   */
+  override def explain(): Unit = explain(extended = false)
+
+  /**
* Returns all column names and their data types as an array.
* @group basic
* @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f1a118/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 97eb5b9..da46001 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,8 +61,8 @@ import org.apache.spark.util.Utils
  */
 @Experimental
 class Dataset[T] private[sql](
-@transient val sqlContext: SQLContext,
-@transient val queryExecution: QueryExecution,
+@transient override val sqlContext: SQLContext,
+@transient override val queryExecution: QueryExecution,
 tEncoder: Encoder[T]) extends Queryable with Serializable {
 
   /**
@@ -85,7 +85,25 @@ class Dataset[T] private[sql](
* Returns the schema of the encoded form of the objects in this [[Dataset]].
* @since

spark git commit: [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 007eb4ac0 -> 997896643


[SPARK-11981][SQL] Move implementations of methods back to DataFrame from 
Queryable

Also added show methods to Dataset.

Author: Reynold Xin 

Closes #9964 from rxin/SPARK-11981.

(cherry picked from commit a0f1a11837bfffb76582499d36fbaf21a1d628cb)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99789664
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99789664
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99789664

Branch: refs/heads/branch-1.6
Commit: 9978966438f05170cb6fdb8d304363015486363c
Parents: 007eb4a
Author: Reynold Xin 
Authored: Wed Nov 25 01:03:18 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 01:03:25 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 35 -
 .../scala/org/apache/spark/sql/Dataset.scala| 77 +++-
 .../apache/spark/sql/execution/Queryable.scala  | 32 ++--
 3 files changed, 111 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99789664/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5eca1db..d8319b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, FileRelation, 
LogicalRDD, QueryExecution, Queryable, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -112,8 +112,8 @@ private[sql] object DataFrame {
  */
 @Experimental
 class DataFrame private[sql](
-@transient val sqlContext: SQLContext,
-@DeveloperApi @transient val queryExecution: QueryExecution)
+@transient override val sqlContext: SQLContext,
+@DeveloperApi @transient override val queryExecution: QueryExecution)
   extends Queryable with Serializable {
 
   // Note for Spark contributors: if adding or updating any action in 
`DataFrame`, please make sure
@@ -283,6 +283,35 @@ class DataFrame private[sql](
   def schema: StructType = queryExecution.analyzed.schema
 
   /**
+   * Prints the schema to the console in a nice tree format.
+   * @group basic
+   * @since 1.3.0
+   */
+  // scalastyle:off println
+  override def printSchema(): Unit = println(schema.treeString)
+  // scalastyle:on println
+
+  /**
+   * Prints the plans (logical and physical) to the console for debugging 
purposes.
+   * @group basic
+   * @since 1.3.0
+   */
+  override def explain(extended: Boolean): Unit = {
+val explain = ExplainCommand(queryExecution.logical, extended = extended)
+sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
+  // scalastyle:off println
+  r => println(r.getString(0))
+  // scalastyle:on println
+}
+  }
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 1.3.0
+   */
+  override def explain(): Unit = explain(extended = false)
+
+  /**
* Returns all column names and their data types as an array.
* @group basic
* @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/99789664/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 97eb5b9..da46001 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,8 +61,8 @@ import org.apache.spark.util.Utils
  */
 @Experimental
 class Dataset[T] private[sql](
-@transient val sqlContext: SQLContext,
-@transient val queryExecution: QueryExecution,
+@transient override val sqlContext: SQLContext,
+@transient override val queryExecution: QueryExecution,
 tEncoder: Encoder[T]) extends Queryable with Serializable {
 
   /**
@@ -85,7 +85,25 @@ class Dataset[T] p

spark git commit: [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7f030aa42 -> 007eb4ac0


[SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset 
API

Except inner join, maybe the other join types are also useful when users are 
using the joinWith function. Thus, added the joinType into the existing 
joinWith call in Dataset APIs.

Also providing another joinWith interface for the cartesian-join-like 
functionality.

Please provide your opinions. marmbrus rxin cloud-fan Thank you!

Author: gatorsmile 

Closes #9921 from gatorsmile/joinWith.

(cherry picked from commit 2610e06124c7fc0b2b1cfb2e3050a35ab492fb71)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007eb4ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007eb4ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007eb4ac

Branch: refs/heads/branch-1.6
Commit: 007eb4ac0a0e8ae68126e7815dd14d23953c207e
Parents: 7f030aa
Author: gatorsmile 
Authored: Wed Nov 25 01:02:36 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 01:02:42 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala| 45 
 .../org/apache/spark/sql/DatasetSuite.scala | 36 +---
 2 files changed, 65 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/007eb4ac/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index dd84b8b..97eb5b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.function._
-
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{Queryable, QueryExecution}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * :: Experimental ::
@@ -83,7 +83,6 @@ class Dataset[T] private[sql](
 
   /**
* Returns the schema of the encoded form of the objects in this [[Dataset]].
-   *
* @since 1.6.0
*/
   def schema: StructType = resolvedTEncoder.schema
@@ -185,7 +184,6 @@ class Dataset[T] private[sql](
* .transform(featurize)
* .transform(...)
* }}}
-   *
* @since 1.6.0
*/
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
@@ -453,6 +451,21 @@ class Dataset[T] private[sql](
   c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
 selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, 
U5)]]
 
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of records.
+   * @since 1.6.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long) : 
Dataset[T] =
+withPlan(Sample(0.0, fraction, withReplacement, seed, _))
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of records, using a 
random seed.
+   * @since 1.6.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double) : Dataset[T] = {
+sample(withReplacement, fraction, Utils.random.nextLong)
+  }
+
   /*  *
*  Set operations  *
*  */
@@ -511,13 +524,17 @@ class Dataset[T] private[sql](
* types as well as working with relational data where either side of the 
join has column
* names in common.
*
+   * @param other Right side of the join.
+   * @param condition Join expression.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, 
`leftsemi`.
* @since 1.6.0
*/
-  def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
+  def joinWith[U](other: Dataset[U], condition: Column, joinType: String): 
Dataset[(T, U)] = {
 val left = this.logicalPlan
 val right = other.logicalPlan
 
-val joined = sqlContext.executePlan(Join(left, right, Inner, 
Some(condition.expr)))
+val joined = sqlContext.executePlan(Join(left, right, joinType =
+  JoinType(joinType), Some(condition.expr)))
 val leftOutput = joined.analyzed.output.take(left.output.length)
 val rightOutput = joined.analyzed.output.takeRight(right.output.length)
 
@@ -540,6 +557,18 @@ class Dataset[T] private[sql](

spark git commit: [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API

2015-11-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 216988688 -> 2610e0612


[SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset 
API

Except inner join, maybe the other join types are also useful when users are 
using the joinWith function. Thus, added the joinType into the existing 
joinWith call in Dataset APIs.

Also providing another joinWith interface for the cartesian-join-like 
functionality.

Please provide your opinions. marmbrus rxin cloud-fan Thank you!

Author: gatorsmile 

Closes #9921 from gatorsmile/joinWith.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2610e061
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2610e061
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2610e061

Branch: refs/heads/master
Commit: 2610e06124c7fc0b2b1cfb2e3050a35ab492fb71
Parents: 2169886
Author: gatorsmile 
Authored: Wed Nov 25 01:02:36 2015 -0800
Committer: Reynold Xin 
Committed: Wed Nov 25 01:02:36 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala| 45 
 .../org/apache/spark/sql/DatasetSuite.scala | 36 +---
 2 files changed, 65 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2610e061/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index dd84b8b..97eb5b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.function._
-
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{Queryable, QueryExecution}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * :: Experimental ::
@@ -83,7 +83,6 @@ class Dataset[T] private[sql](
 
   /**
* Returns the schema of the encoded form of the objects in this [[Dataset]].
-   *
* @since 1.6.0
*/
   def schema: StructType = resolvedTEncoder.schema
@@ -185,7 +184,6 @@ class Dataset[T] private[sql](
* .transform(featurize)
* .transform(...)
* }}}
-   *
* @since 1.6.0
*/
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
@@ -453,6 +451,21 @@ class Dataset[T] private[sql](
   c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
 selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, 
U5)]]
 
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of records.
+   * @since 1.6.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long) : 
Dataset[T] =
+withPlan(Sample(0.0, fraction, withReplacement, seed, _))
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of records, using a 
random seed.
+   * @since 1.6.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double) : Dataset[T] = {
+sample(withReplacement, fraction, Utils.random.nextLong)
+  }
+
   /*  *
*  Set operations  *
*  */
@@ -511,13 +524,17 @@ class Dataset[T] private[sql](
* types as well as working with relational data where either side of the 
join has column
* names in common.
*
+   * @param other Right side of the join.
+   * @param condition Join expression.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, 
`leftsemi`.
* @since 1.6.0
*/
-  def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
+  def joinWith[U](other: Dataset[U], condition: Column, joinType: String): 
Dataset[(T, U)] = {
 val left = this.logicalPlan
 val right = other.logicalPlan
 
-val joined = sqlContext.executePlan(Join(left, right, Inner, 
Some(condition.expr)))
+val joined = sqlContext.executePlan(Join(left, right, joinType =
+  JoinType(joinType), Some(condition.expr)))
 val leftOutput = joined.analyzed.output.take(left.output.length)
 val rightOutput = joined.analyzed.output.takeRight(right.output.length)
 
@@ -540,6 +557,18 @@ class Dataset[T] private[sql](
 }
   }
 
+  /**
+   * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for 
eac