spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.4 1df3e8230 -> f5af299ab [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong Zhu Closes #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5af299a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5af299a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5af299a Branch: refs/heads/branch-1.4 Commit: f5af299ab06f654e000c99917c703f989bffaa43 Parents: 1df3e82 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:35:33 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index ca5624a..94d581f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 8c51e6b..1fb6364 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -58,6 +60,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation + cachedThreadPool.execute(new Runnable { +override def run(): Unit
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.5 b1fcefca6 -> 7900d192e [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong Zhu Closes #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192 Branch: refs/heads/branch-1.5 Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764 Parents: b1fcefc Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:53 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 06976f8..3159ef7 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -57,10 +57,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation + cachedThreadPool.execute(new Runnable { +override def run(): Unit = { + latch.await(10,
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.6 7e7f2627f -> 0df6beccc [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong Zhu Closes #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc Branch: refs/heads/branch-1.6 Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d Parents: 7e7f262 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:36 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5328344..f9fbe2f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation + cachedThreadPool.execute(new Runnable { +override def run(): Unit = { + latch.await(10,
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/master 068b6438d -> d3ef69332 [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong Zhu Closes #9978 from zsxwing/cached-threadpool. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933 Branch: refs/heads/master Commit: d3ef693325f91a1ed340c9756c81244a80398eb2 Parents: 068b643 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:21 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5328344..f9fbe2f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation + cachedThreadPool.execute(new Runnable { +override def run(): Unit = { + latch.await(10, TimeUnit.SECONDS) +} + }) + + assert(cachedThreadPool.getActiveCount === maxThreadNumber
spark git commit: [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for
Repository: spark Updated Branches: refs/heads/master d1930ec01 -> 068b6438d [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for Added Python test cases for the function `isnan`, `isnull`, `nanvl` and `json_tuple`. Fixed a bug in the function `json_tuple` rxin , could you help me review my changes? Please let me know anything is missing. Thank you! Have a good Thanksgiving day! Author: gatorsmile Closes #9977 from gatorsmile/json_tuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/068b6438 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/068b6438 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/068b6438 Branch: refs/heads/master Commit: 068b6438d6886ce5b4aa698383866f466d913d66 Parents: d1930ec Author: gatorsmile Authored: Wed Nov 25 23:24:33 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 23:24:33 2015 -0800 -- python/pyspark/sql/functions.py | 44 1 file changed, 34 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/068b6438/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e3786e0..9062594 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -286,14 +286,6 @@ def countDistinct(col, *cols): return Column(jc) -@since(1.4) -def monotonicallyIncreasingId(): -""" -.. note:: Deprecated in 1.6, use monotonically_increasing_id instead. -""" -return monotonically_increasing_id() - - @since(1.6) def input_file_name(): """Creates a string column for the file name of the current Spark task. @@ -305,6 +297,10 @@ def input_file_name(): @since(1.6) def isnan(col): """An expression that returns true iff the column is NaN. + +>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) +>>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect() +[Row(r1=False, r2=False), Row(r1=True, r2=True)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.isnan(_to_java_column(col))) @@ -313,11 +309,23 @@ def isnan(col): @since(1.6) def isnull(col): """An expression that returns true iff the column is null. + +>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b")) +>>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect() +[Row(r1=False, r2=False), Row(r1=True, r2=True)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.isnull(_to_java_column(col))) +@since(1.4) +def monotonicallyIncreasingId(): +""" +.. note:: Deprecated in 1.6, use monotonically_increasing_id instead. +""" +return monotonically_increasing_id() + + @since(1.6) def monotonically_increasing_id(): """A column that generates monotonically increasing 64-bit integers. @@ -344,6 +352,10 @@ def nanvl(col1, col2): """Returns col1 if it is not NaN, or col2 if col1 is NaN. Both inputs should be floating point columns (DoubleType or FloatType). + +>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) +>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect() +[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2))) @@ -1460,6 +1472,7 @@ def explode(col): return Column(jc) +@ignore_unicode_prefix @since(1.6) def get_json_object(col, path): """ @@ -1468,22 +1481,33 @@ def get_json_object(col, path): :param col: string column in json format :param path: path to the json object to extract + +>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] +>>> df = sqlContext.createDataFrame(data, ("key", "jstring")) +>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \ + get_json_object(df.jstring, '$.f2').alias("c1") ).collect() +[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.get_json_object(_to_java_column(col), path) return Column(jc) +@ignore_unicode_prefix @since(1.6) -def json_tuple(col, fields): +def json_tuple(col, *fields): """Creates a new row for a json column according to the given field names. :param col: string column in json format :param fields: list of fields to extract +>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''
spark git commit: [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for
Repository: spark Updated Branches: refs/heads/branch-1.6 d40bf9ad8 -> 7e7f2627f [SPARK-11980][SPARK-10621][SQL] Fix json_tuple and add test cases for Added Python test cases for the function `isnan`, `isnull`, `nanvl` and `json_tuple`. Fixed a bug in the function `json_tuple` rxin , could you help me review my changes? Please let me know anything is missing. Thank you! Have a good Thanksgiving day! Author: gatorsmile Closes #9977 from gatorsmile/json_tuple. (cherry picked from commit 068b6438d6886ce5b4aa698383866f466d913d66) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e7f2627 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e7f2627 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e7f2627 Branch: refs/heads/branch-1.6 Commit: 7e7f2627f941585a6fb1e086e22d6d1d25b692ab Parents: d40bf9a Author: gatorsmile Authored: Wed Nov 25 23:24:33 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 23:24:40 2015 -0800 -- python/pyspark/sql/functions.py | 44 1 file changed, 34 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e7f2627/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e3786e0..9062594 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -286,14 +286,6 @@ def countDistinct(col, *cols): return Column(jc) -@since(1.4) -def monotonicallyIncreasingId(): -""" -.. note:: Deprecated in 1.6, use monotonically_increasing_id instead. -""" -return monotonically_increasing_id() - - @since(1.6) def input_file_name(): """Creates a string column for the file name of the current Spark task. @@ -305,6 +297,10 @@ def input_file_name(): @since(1.6) def isnan(col): """An expression that returns true iff the column is NaN. + +>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) +>>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect() +[Row(r1=False, r2=False), Row(r1=True, r2=True)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.isnan(_to_java_column(col))) @@ -313,11 +309,23 @@ def isnan(col): @since(1.6) def isnull(col): """An expression that returns true iff the column is null. + +>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b")) +>>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect() +[Row(r1=False, r2=False), Row(r1=True, r2=True)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.isnull(_to_java_column(col))) +@since(1.4) +def monotonicallyIncreasingId(): +""" +.. note:: Deprecated in 1.6, use monotonically_increasing_id instead. +""" +return monotonically_increasing_id() + + @since(1.6) def monotonically_increasing_id(): """A column that generates monotonically increasing 64-bit integers. @@ -344,6 +352,10 @@ def nanvl(col1, col2): """Returns col1 if it is not NaN, or col2 if col1 is NaN. Both inputs should be floating point columns (DoubleType or FloatType). + +>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) +>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect() +[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2))) @@ -1460,6 +1472,7 @@ def explode(col): return Column(jc) +@ignore_unicode_prefix @since(1.6) def get_json_object(col, path): """ @@ -1468,22 +1481,33 @@ def get_json_object(col, path): :param col: string column in json format :param path: path to the json object to extract + +>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] +>>> df = sqlContext.createDataFrame(data, ("key", "jstring")) +>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \ + get_json_object(df.jstring, '$.f2').alias("c1") ).collect() +[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.get_json_object(_to_java_column(col), path) return Column(jc) +@ignore_unicode_prefix @since(1.6) -def json_tuple(col, fields): +def json_tuple(col, *fields): """Creates a new row for a json column according to the given field names. :param col: string column in json format
spark git commit: [SPARK-12003] [SQL] remove the prefix for name after expanded star
Repository: spark Updated Branches: refs/heads/branch-1.6 399739702 -> d40bf9ad8 [SPARK-12003] [SQL] remove the prefix for name after expanded star Right now, the expended start will include the name of expression as prefix for column, that's not better than without expending, we should not have the prefix. Author: Davies Liu Closes #9984 from davies/expand_star. (cherry picked from commit d1930ec01ab5a9d83f801f8ae8d4f15a38d98b76) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d40bf9ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d40bf9ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d40bf9ad Branch: refs/heads/branch-1.6 Commit: d40bf9ad881f4ba6c550cd61acc2e8c29c9dc60f Parents: 3997397 Author: Davies Liu Authored: Wed Nov 25 21:25:20 2015 -0800 Committer: Davies Liu Committed: Wed Nov 25 21:25:31 2015 -0800 -- .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d40bf9ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 1b2a8dc..4f89b46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -204,7 +204,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu case s: StructType => s.zipWithIndex.map { case (f, i) => val extract = GetStructField(attribute.get, i) -Alias(extract, target.get + "." + f.name)() +Alias(extract, f.name)() } case _ => { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12003] [SQL] remove the prefix for name after expanded star
Repository: spark Updated Branches: refs/heads/master cc243a079 -> d1930ec01 [SPARK-12003] [SQL] remove the prefix for name after expanded star Right now, the expended start will include the name of expression as prefix for column, that's not better than without expending, we should not have the prefix. Author: Davies Liu Closes #9984 from davies/expand_star. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1930ec0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1930ec0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1930ec0 Branch: refs/heads/master Commit: d1930ec01ab5a9d83f801f8ae8d4f15a38d98b76 Parents: cc243a0 Author: Davies Liu Authored: Wed Nov 25 21:25:20 2015 -0800 Committer: Davies Liu Committed: Wed Nov 25 21:25:20 2015 -0800 -- .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d1930ec0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 1b2a8dc..4f89b46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -204,7 +204,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu case s: StructType => s.zipWithIndex.map { case (f, i) => val extract = GetStructField(attribute.get, i) -Alias(extract, target.get + "." + f.name)() +Alias(extract, f.name)() } case _ => { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11206] Support SQL UI on the history server
Repository: spark Updated Branches: refs/heads/master 21e560641 -> cc243a079 [SPARK-11206] Support SQL UI on the history server On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution. To support SQL UI on the history server: 1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus. 2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module. 3. The new SQL events are written to event log using Jackson. 4. A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`. Author: Carson Wang Closes #9297 from carsonwang/SqlHistoryUI. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc243a07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc243a07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc243a07 Branch: refs/heads/master Commit: cc243a079b1c039d6e7f0b410d1654d94a090e14 Parents: 21e5606 Author: Carson Wang Authored: Wed Nov 25 15:13:13 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 15:13:13 2015 -0800 -- .rat-excludes | 1 + .../org/apache/spark/JavaSparkListener.java | 3 + .../org/apache/spark/SparkFirehoseListener.java | 4 + .../spark/scheduler/EventLoggingListener.scala | 4 + .../apache/spark/scheduler/SparkListener.scala | 24 +++- .../spark/scheduler/SparkListenerBus.scala | 1 + .../scala/org/apache/spark/ui/SparkUI.scala | 16 ++- .../org/apache/spark/util/JsonProtocol.scala| 11 +- spark.scheduler.SparkHistoryListenerFactory | 1 + .../scala/org/apache/spark/sql/SQLContext.scala | 18 ++- .../spark/sql/execution/SQLExecution.scala | 24 +--- .../spark/sql/execution/SparkPlanInfo.scala | 46 ++ .../sql/execution/metric/SQLMetricInfo.scala| 30 .../spark/sql/execution/metric/SQLMetrics.scala | 56 +--- .../spark/sql/execution/ui/ExecutionPage.scala | 4 +- .../spark/sql/execution/ui/SQLListener.scala| 139 +-- .../apache/spark/sql/execution/ui/SQLTab.scala | 12 +- .../spark/sql/execution/ui/SparkPlanGraph.scala | 20 +-- .../sql/execution/metric/SQLMetricsSuite.scala | 4 +- .../sql/execution/ui/SQLListenerSuite.scala | 43 +++--- .../spark/sql/test/SharedSQLContext.scala | 1 + 21 files changed, 327 insertions(+), 135 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 08fba6d..7262c96 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -82,4 +82,5 @@ INDEX gen-java.* .*avpr org.apache.spark.sql.sources.DataSourceRegister +org.apache.spark.scheduler.SparkHistoryListenerFactory .*parquet http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/JavaSparkListener.java -- diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java index fa9acf0..23bc9a2 100644 --- a/core/src/main/java/org/apache/spark/JavaSparkListener.java +++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java @@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener { @Override public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { } + @Override + public void onOtherEvent(SparkListenerEvent event) { } + } http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/java/org/apache/spark/SparkFirehoseListener.java -- diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 1214d05..e6b24af 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener { onEvent(blockUpdated); } +@Override +public void onOtherEvent(SparkListenerEvent event) { +onEvent(event); +} } http://git-wip-us.apache.org/repos/asf/spark/blob/cc243a07/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
spark git commit: [SPARK-11983][SQL] remove all unused codegen fallback trait
Repository: spark Updated Branches: refs/heads/master ecac28354 -> 21e560641 [SPARK-11983][SQL] remove all unused codegen fallback trait Author: Daoyuan Wang Closes #9966 from adrian-wang/removeFallback. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21e56064 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21e56064 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21e56064 Branch: refs/heads/master Commit: 21e5606419c4b7462d30580c549e9bfa0123ae23 Parents: ecac283 Author: Daoyuan Wang Authored: Wed Nov 25 13:51:30 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 13:51:30 2015 -0800 -- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 3 +-- .../spark/sql/catalyst/expressions/regexpExpressions.scala | 4 ++-- .../spark/sql/catalyst/expressions/NonFoldableLiteral.scala | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 533d17e..a2c6c39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -104,8 +104,7 @@ object Cast { } /** Cast the child expression to the target data type. */ -case class Cast(child: Expression, dataType: DataType) - extends UnaryExpression with CodegenFallback { +case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { override def toString: String = s"cast($child as ${dataType.simpleString})" http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index 9e484c5..adef605 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -66,7 +66,7 @@ trait StringRegexExpression extends ImplicitCastInputTypes { * Simple RegEx pattern matching function */ case class Like(left: Expression, right: Expression) - extends BinaryExpression with StringRegexExpression with CodegenFallback { + extends BinaryExpression with StringRegexExpression { override def escape(v: String): String = StringUtils.escapeLikeRegex(v) @@ -117,7 +117,7 @@ case class Like(left: Expression, right: Expression) case class RLike(left: Expression, right: Expression) - extends BinaryExpression with StringRegexExpression with CodegenFallback { + extends BinaryExpression with StringRegexExpression { override def escape(v: String): String = v override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0) http://git-wip-us.apache.org/repos/asf/spark/blob/21e56064/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala index 31ecf4a..118fd69 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NonFoldableLiteral.scala @@ -26,8 +26,7 @@ import org.apache.spark.sql.types._ * A literal value that is not foldable. Used in expression codegen testing to test code path * that behave differently based on foldable values. */ -case class NonFoldableLiteral(value: Any, dataType: DataType) - extends LeafExpression with CodegenFallback { +case class NonFoldableLiteral(value: Any, dataType: DataType) extends LeafExpression { override def foldable: Boolean = false override def nullable: Boolean = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix Aggregator documentation (rename present to finish).
Repository: spark Updated Branches: refs/heads/branch-1.6 cd86d8c74 -> 399739702 Fix Aggregator documentation (rename present to finish). (cherry picked from commit ecac2835458bbf73fe59413d5bf921500c5b987d) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39973970 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39973970 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39973970 Branch: refs/heads/branch-1.6 Commit: 3997397024975b8affca0d609a231cde5e9959a5 Parents: cd86d8c Author: Reynold Xin Authored: Wed Nov 25 13:45:41 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 13:48:15 2015 -0800 -- .../main/scala/org/apache/spark/sql/expressions/Aggregator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39973970/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index b0cd32b..65117d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, TypedColumn} * def zero: Int = 0 * def reduce(b: Int, a: Data): Int = b + a.i * def merge(b1: Int, b2: Int): Int = b1 + b2 - * def present(r: Int): Int = r + * def finish(r: Int): Int = r * }.toColumn() * * val ds: Dataset[Data] = ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix Aggregator documentation (rename present to finish).
Repository: spark Updated Branches: refs/heads/master 4e81783e9 -> ecac28354 Fix Aggregator documentation (rename present to finish). Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecac2835 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecac2835 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecac2835 Branch: refs/heads/master Commit: ecac2835458bbf73fe59413d5bf921500c5b987d Parents: 4e81783 Author: Reynold Xin Authored: Wed Nov 25 13:45:41 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 13:45:41 2015 -0800 -- .../main/scala/org/apache/spark/sql/expressions/Aggregator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ecac2835/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index b0cd32b..65117d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, TypedColumn} * def zero: Int = 0 * def reduce(b: Int, a: Data): Int = b + a.i * def merge(b1: Int, b2: Int): Int = b1 + b2 - * def present(r: Int): Int = r + * def finish(r: Int): Int = r * }.toColumn() * * val ds: Dataset[Data] = ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.
Repository: spark Updated Branches: refs/heads/branch-1.6 849ddb6ae -> cd86d8c74 [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up. This change does a couple of different things to make sure that the RpcEnv-level code and the network library agree about the status of outstanding RPCs. For RPCs that do not expect a reply ("RpcEnv.send"), support for one way messages (hello CORBA!) was added to the network layer. This is a "fire and forget" message that does not require any state to be kept by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed anymore. For RPCs that do expect a reply ("RpcEnv.ask"), the network library now returns the internal RPC id; if the RpcEnv layer decides to time out the RPC before the network layer does, it now asks the TransportClient to forget about the RPC, so that if the network-level timeout occurs, the client is not killed. As part of implementing the above, I cleaned up some of the code in the netty rpc backend, removing types that were not necessary and factoring out some common code. Of interest is a slight change in the exceptions when posting messages to a stopped RpcEnv; that's mostly to avoid nasty error messages from the local-cluster backend when shutting down, which pollutes the terminal output. Author: Marcelo Vanzin Closes #9917 from vanzin/SPARK-11866. (cherry picked from commit 4e81783e92f464d479baaf93eccc3adb1496989a) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd86d8c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd86d8c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd86d8c7 Branch: refs/heads/branch-1.6 Commit: cd86d8c745b745b120ec00cd466ac6cdb05296a6 Parents: 849ddb6 Author: Marcelo Vanzin Authored: Wed Nov 25 12:58:18 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 12:58:30 2015 -0800 -- .../spark/deploy/worker/ExecutorRunner.scala| 6 +- .../org/apache/spark/rpc/netty/Dispatcher.scala | 55 +++ .../org/apache/spark/rpc/netty/Inbox.scala | 28 ++-- .../spark/rpc/netty/NettyRpcCallContext.scala | 35 + .../apache/spark/rpc/netty/NettyRpcEnv.scala| 153 --- .../org/apache/spark/rpc/netty/Outbox.scala | 64 ++-- .../org/apache/spark/rpc/netty/InboxSuite.scala | 6 +- .../spark/rpc/netty/NettyRpcHandlerSuite.scala | 2 +- .../spark/network/client/TransportClient.java | 34 - .../apache/spark/network/protocol/Message.java | 4 +- .../spark/network/protocol/MessageDecoder.java | 3 + .../spark/network/protocol/OneWayMessage.java | 75 + .../spark/network/sasl/SaslRpcHandler.java | 5 + .../apache/spark/network/server/RpcHandler.java | 36 + .../network/server/TransportRequestHandler.java | 18 ++- .../org/apache/spark/network/ProtocolSuite.java | 2 + .../spark/network/RpcIntegrationSuite.java | 31 .../spark/network/sasl/SparkSaslSuite.java | 9 ++ 18 files changed, 374 insertions(+), 192 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd86d8c7/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 3aef051..25a1747 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -92,7 +92,11 @@ private[deploy] class ExecutorRunner( process.destroy() exitCode = Some(process.waitFor()) } -worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode)) +try { + worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode)) +} catch { + case e: IllegalStateException => logWarning(e.getMessage(), e) +} } /** Stop this executor runner, including killing the process it launched */ http://git-wip-us.apache.org/repos/asf/spark/blob/cd86d8c7/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index eb25d6c..533c984 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -106,44 +106,30 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - post
spark git commit: [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.
Repository: spark Updated Branches: refs/heads/master d29e2ef4c -> 4e81783e9 [SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up. This change does a couple of different things to make sure that the RpcEnv-level code and the network library agree about the status of outstanding RPCs. For RPCs that do not expect a reply ("RpcEnv.send"), support for one way messages (hello CORBA!) was added to the network layer. This is a "fire and forget" message that does not require any state to be kept by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed anymore. For RPCs that do expect a reply ("RpcEnv.ask"), the network library now returns the internal RPC id; if the RpcEnv layer decides to time out the RPC before the network layer does, it now asks the TransportClient to forget about the RPC, so that if the network-level timeout occurs, the client is not killed. As part of implementing the above, I cleaned up some of the code in the netty rpc backend, removing types that were not necessary and factoring out some common code. Of interest is a slight change in the exceptions when posting messages to a stopped RpcEnv; that's mostly to avoid nasty error messages from the local-cluster backend when shutting down, which pollutes the terminal output. Author: Marcelo Vanzin Closes #9917 from vanzin/SPARK-11866. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e81783e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e81783e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e81783e Branch: refs/heads/master Commit: 4e81783e92f464d479baaf93eccc3adb1496989a Parents: d29e2ef Author: Marcelo Vanzin Authored: Wed Nov 25 12:58:18 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 12:58:18 2015 -0800 -- .../spark/deploy/worker/ExecutorRunner.scala| 6 +- .../org/apache/spark/rpc/netty/Dispatcher.scala | 55 +++ .../org/apache/spark/rpc/netty/Inbox.scala | 28 ++-- .../spark/rpc/netty/NettyRpcCallContext.scala | 35 + .../apache/spark/rpc/netty/NettyRpcEnv.scala| 153 --- .../org/apache/spark/rpc/netty/Outbox.scala | 64 ++-- .../org/apache/spark/rpc/netty/InboxSuite.scala | 6 +- .../spark/rpc/netty/NettyRpcHandlerSuite.scala | 2 +- .../spark/network/client/TransportClient.java | 34 - .../apache/spark/network/protocol/Message.java | 4 +- .../spark/network/protocol/MessageDecoder.java | 3 + .../spark/network/protocol/OneWayMessage.java | 75 + .../spark/network/sasl/SaslRpcHandler.java | 5 + .../apache/spark/network/server/RpcHandler.java | 36 + .../network/server/TransportRequestHandler.java | 18 ++- .../org/apache/spark/network/ProtocolSuite.java | 2 + .../spark/network/RpcIntegrationSuite.java | 31 .../spark/network/sasl/SparkSaslSuite.java | 9 ++ 18 files changed, 374 insertions(+), 192 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e81783e/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 3aef051..25a1747 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -92,7 +92,11 @@ private[deploy] class ExecutorRunner( process.destroy() exitCode = Some(process.waitFor()) } -worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode)) +try { + worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode)) +} catch { + case e: IllegalStateException => logWarning(e.getMessage(), e) +} } /** Stop this executor runner, including killing the process it launched */ http://git-wip-us.apache.org/repos/asf/spark/blob/4e81783e/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index eb25d6c..533c984 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -106,44 +106,30 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - postMessage( -name, -_ => message, -() => { logWarning(s"Drop $message because $name has
spark git commit: [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java
Repository: spark Updated Branches: refs/heads/master 88875d941 -> d29e2ef4c [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java The Python exception track in TransformFunction and TransformFunctionSerializer is not sent back to Java. Py4j just throws a very general exception, which is hard to debug. This PRs adds `getFailure` method to get the failure message in Java side. Author: Shixiong Zhu Closes #9922 from zsxwing/SPARK-11935. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d29e2ef4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d29e2ef4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d29e2ef4 Branch: refs/heads/master Commit: d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0 Parents: 88875d9 Author: Shixiong Zhu Authored: Wed Nov 25 11:47:21 2015 -0800 Committer: Tathagata Das Committed: Wed Nov 25 11:47:21 2015 -0800 -- python/pyspark/streaming/tests.py | 82 +++- python/pyspark/streaming/util.py| 29 --- .../streaming/api/python/PythonDStream.scala| 52 ++--- 3 files changed, 144 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d29e2ef4/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a0e0267..d380d69 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -404,17 +404,69 @@ class BasicOperationTests(PySparkStreamingTestCase): self._test_func(input, func, expected) def test_failed_func(self): +# Test failure in +# TransformFunction.apply(rdd: Option[RDD[_]], time: Time) input = [self.sc.parallelize([d], 1) for d in range(4)] input_stream = self.ssc.queueStream(input) def failed_func(i): -raise ValueError("failed") +raise ValueError("This is a special error") input_stream.map(failed_func).pprint() self.ssc.start() try: self.ssc.awaitTerminationOrTimeout(10) except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) +return + +self.fail("a failed func should throw an error") + +def test_failed_func2(self): +# Test failure in +# TransformFunction.apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time) +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream1 = self.ssc.queueStream(input) +input_stream2 = self.ssc.queueStream(input) + +def failed_func(rdd1, rdd2): +raise ValueError("This is a special error") + +input_stream1.transformWith(failed_func, input_stream2, True).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) +return + +self.fail("a failed func should throw an error") + +def test_failed_func_with_reseting_failure(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +if i == 1: +# Make it fail in the second batch +raise ValueError("This is a special error") +else: +return i + +# We should be able to see the results of the 3rd and 4th batches even if the second batch +# fails +expected = [[0], [2], [3]] +self.assertEqual(expected, self._collect(input_stream.map(failed_func), 3)) +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) return self.fail("a failed func should throw an error") @@ -780,6 +832,34 @@ class CheckpointTests(unittest.TestCase): if self.cpd is not None: shutil.rmtree(self.cpd) +def test_transform_function_serializer_failure(self): +inputd = tempfile.mkdtemp() +self.cpd = tempfile.mkdtemp("test_transform_function_serializer_failure") + +def setup(): +conf = SparkConf().set("spark.default.parallelism", 1) +sc = SparkContext(conf=conf) +ssc = StreamingContext(sc, 0.5) + +# A function that cannot be serialized +def process(time, rdd): +
spark git commit: [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java
Repository: spark Updated Branches: refs/heads/branch-1.6 b4cf318ab -> 849ddb6ae [SPARK-11935][PYSPARK] Send the Python exceptions in TransformFunction and TransformFunctionSerializer to Java The Python exception track in TransformFunction and TransformFunctionSerializer is not sent back to Java. Py4j just throws a very general exception, which is hard to debug. This PRs adds `getFailure` method to get the failure message in Java side. Author: Shixiong Zhu Closes #9922 from zsxwing/SPARK-11935. (cherry picked from commit d29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849ddb6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849ddb6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849ddb6a Branch: refs/heads/branch-1.6 Commit: 849ddb6ae69416434173824d50d59ebc8b4dbbf5 Parents: b4cf318 Author: Shixiong Zhu Authored: Wed Nov 25 11:47:21 2015 -0800 Committer: Tathagata Das Committed: Wed Nov 25 11:47:33 2015 -0800 -- python/pyspark/streaming/tests.py | 82 +++- python/pyspark/streaming/util.py| 29 --- .../streaming/api/python/PythonDStream.scala| 52 ++--- 3 files changed, 144 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/849ddb6a/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a0e0267..d380d69 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -404,17 +404,69 @@ class BasicOperationTests(PySparkStreamingTestCase): self._test_func(input, func, expected) def test_failed_func(self): +# Test failure in +# TransformFunction.apply(rdd: Option[RDD[_]], time: Time) input = [self.sc.parallelize([d], 1) for d in range(4)] input_stream = self.ssc.queueStream(input) def failed_func(i): -raise ValueError("failed") +raise ValueError("This is a special error") input_stream.map(failed_func).pprint() self.ssc.start() try: self.ssc.awaitTerminationOrTimeout(10) except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) +return + +self.fail("a failed func should throw an error") + +def test_failed_func2(self): +# Test failure in +# TransformFunction.apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time) +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream1 = self.ssc.queueStream(input) +input_stream2 = self.ssc.queueStream(input) + +def failed_func(rdd1, rdd2): +raise ValueError("This is a special error") + +input_stream1.transformWith(failed_func, input_stream2, True).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) +return + +self.fail("a failed func should throw an error") + +def test_failed_func_with_reseting_failure(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +if i == 1: +# Make it fail in the second batch +raise ValueError("This is a special error") +else: +return i + +# We should be able to see the results of the 3rd and 4th batches even if the second batch +# fails +expected = [[0], [2], [3]] +self.assertEqual(expected, self._collect(input_stream.map(failed_func), 3)) +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +import traceback +failure = traceback.format_exc() +self.assertTrue("This is a special error" in failure) return self.fail("a failed func should throw an error") @@ -780,6 +832,34 @@ class CheckpointTests(unittest.TestCase): if self.cpd is not None: shutil.rmtree(self.cpd) +def test_transform_function_serializer_failure(self): +inputd = tempfile.mkdtemp() +self.cpd = tempfile.mkdtemp("test_transform_function_serializer_failure") + +def setup(): +conf = SparkConf().set("spark.default.parallelism", 1) +sc = SparkContext(conf=conf) +ssc = StreamingContext(s
spark git commit: [SPARK-10558][CORE] Fix wrong executor state in Master
Repository: spark Updated Branches: refs/heads/master 9f3e59a16 -> 88875d941 [SPARK-10558][CORE] Fix wrong executor state in Master `ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state. Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`. Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`: ![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png) Author: jerryshao Closes #8714 from jerryshao/SPARK-10558. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88875d94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88875d94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88875d94 Branch: refs/heads/master Commit: 88875d9413ec7d64a88d40857ffcf97b5853a7f2 Parents: 9f3e59a Author: jerryshao Authored: Wed Nov 25 11:42:53 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:42:53 2015 -0800 -- .../scala/org/apache/spark/deploy/ExecutorState.scala | 2 +- .../org/apache/spark/deploy/client/AppClient.scala| 3 --- .../scala/org/apache/spark/deploy/master/Master.scala | 14 +++--- .../scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index efa88c6..69c98e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[deploy] object ExecutorState extends Enumeration { - val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value + val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value type ExecutorState = Value http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index afab362..df6ba7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -178,9 +178,6 @@ private[spark] class AppClient( val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) -// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not -// guaranteed), `ExecutorStateChanged` may be sent to a dead master. -sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b25a487..9952c97 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -253,9 +253,17 @@ private[deploy] class Master( execOption match { case Some(exec) => { val appInfo = idToApp(appId) + val oldState = exec.state exec.state = state - if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } + + if (state == ExecutorState.RUNNING) { +assert(oldState == ExecutorState.LAUNCHING, + s"executor $execId state transfer from $oldState to RUNNING is illegal") +appInfo.resetRetryCount() + } + exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) + if (ExecutorState.
spark git commit: [SPARK-10558][CORE] Fix wrong executor state in Master
Repository: spark Updated Branches: refs/heads/branch-1.6 7b720bf1c -> b4cf318ab [SPARK-10558][CORE] Fix wrong executor state in Master `ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state. Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`. Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`: ![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png) Author: jerryshao Closes #8714 from jerryshao/SPARK-10558. (cherry picked from commit 88875d9413ec7d64a88d40857ffcf97b5853a7f2) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4cf318a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4cf318a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4cf318a Branch: refs/heads/branch-1.6 Commit: b4cf318ab9e75f9fb8e4e15f60d263889189b445 Parents: 7b720bf Author: jerryshao Authored: Wed Nov 25 11:42:53 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:43:00 2015 -0800 -- .../scala/org/apache/spark/deploy/ExecutorState.scala | 2 +- .../org/apache/spark/deploy/client/AppClient.scala| 3 --- .../scala/org/apache/spark/deploy/master/Master.scala | 14 +++--- .../scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index efa88c6..69c98e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[deploy] object ExecutorState extends Enumeration { - val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value + val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value type ExecutorState = Value http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index afab362..df6ba7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -178,9 +178,6 @@ private[spark] class AppClient( val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) -// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not -// guaranteed), `ExecutorStateChanged` may be sent to a dead master. -sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => http://git-wip-us.apache.org/repos/asf/spark/blob/b4cf318a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b25a487..9952c97 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -253,9 +253,17 @@ private[deploy] class Master( execOption match { case Some(exec) => { val appInfo = idToApp(appId) + val oldState = exec.state exec.state = state - if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } + + if (state == ExecutorState.RUNNING) { +assert(oldState == ExecutorState.LAUNCHING, + s"executor $execId state transfer from $oldState to RUNNING is illegal") +appInfo.resetRetryCount() + } + exec.appli
spark git commit: [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory
Repository: spark Updated Branches: refs/heads/master 83653ac5e -> 9f3e59a16 [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory * On windows the `bin/load-spark-env.cmd` tries to load `spark-env.cmd` from `%~dp0..\..\conf`, where `~dp0` points to `bin` and `conf` is only one level up. * Updated `bin/load-spark-env.cmd` to load `spark-env.cmd` from `%~dp0..\conf`, instead of `%~dp0..\..\conf` Author: wangt Closes #9863 from toddwan/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f3e59a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f3e59a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f3e59a1 Branch: refs/heads/master Commit: 9f3e59a16822fb61d60cf103bd4f7823552939c6 Parents: 83653ac Author: wangt Authored: Wed Nov 25 11:41:05 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:41:05 2015 -0800 -- bin/load-spark-env.cmd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9f3e59a1/bin/load-spark-env.cmd -- diff --git a/bin/load-spark-env.cmd b/bin/load-spark-env.cmd index 36d932c..59080ed 100644 --- a/bin/load-spark-env.cmd +++ b/bin/load-spark-env.cmd @@ -27,7 +27,7 @@ if [%SPARK_ENV_LOADED%] == [] ( if not [%SPARK_CONF_DIR%] == [] ( set user_conf_dir=%SPARK_CONF_DIR% ) else ( -set user_conf_dir=%~dp0..\..\conf +set user_conf_dir=%~dp0..\conf ) call :LoadSparkEnv - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory
Repository: spark Updated Branches: refs/heads/branch-1.6 97317d346 -> 7b720bf1c [SPARK-11880][WINDOWS][SPARK SUBMIT] bin/load-spark-env.cmd loads spark-env.cmd from wrong directory * On windows the `bin/load-spark-env.cmd` tries to load `spark-env.cmd` from `%~dp0..\..\conf`, where `~dp0` points to `bin` and `conf` is only one level up. * Updated `bin/load-spark-env.cmd` to load `spark-env.cmd` from `%~dp0..\conf`, instead of `%~dp0..\..\conf` Author: wangt Closes #9863 from toddwan/master. (cherry picked from commit 9f3e59a16822fb61d60cf103bd4f7823552939c6) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b720bf1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b720bf1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b720bf1 Branch: refs/heads/branch-1.6 Commit: 7b720bf1c4860fdb560540744c5fdfb333b752bc Parents: 97317d3 Author: wangt Authored: Wed Nov 25 11:41:05 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:41:12 2015 -0800 -- bin/load-spark-env.cmd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7b720bf1/bin/load-spark-env.cmd -- diff --git a/bin/load-spark-env.cmd b/bin/load-spark-env.cmd index 36d932c..59080ed 100644 --- a/bin/load-spark-env.cmd +++ b/bin/load-spark-env.cmd @@ -27,7 +27,7 @@ if [%SPARK_ENV_LOADED%] == [] ( if not [%SPARK_CONF_DIR%] == [] ( set user_conf_dir=%SPARK_CONF_DIR% ) else ( -set user_conf_dir=%~dp0..\..\conf +set user_conf_dir=%~dp0..\conf ) call :LoadSparkEnv - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10864][WEB UI] app name is hidden if window is resized
Repository: spark Updated Branches: refs/heads/master 67b673208 -> 83653ac5e [SPARK-10864][WEB UI] app name is hidden if window is resized Currently the Web UI navbar has a minimum width of 1200px; so if a window is resized smaller than that the app name goes off screen. The 1200px width seems to have been chosen since it fits the longest example app name without wrapping. To work with smaller window widths I made the tabs wrap since it looked better than wrapping the app name. This is a distinct change in how the navbar looks and I'm not sure if it's what we actually want to do. Other notes: - min-width set to 600px to keep the tabs from wrapping individually (will need to be adjusted if tabs are added) - app name will also wrap (making three levels) if a really really long app name is used Author: Alex Bozarth Closes #9874 from ajbozarth/spark10864. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83653ac5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83653ac5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83653ac5 Branch: refs/heads/master Commit: 83653ac5e71996c5a366a42170bed316b208f1b5 Parents: 67b6732 Author: Alex Bozarth Authored: Wed Nov 25 11:39:00 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:39:00 2015 -0800 -- core/src/main/resources/org/apache/spark/ui/static/webui.css | 8 ++-- core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83653ac5/core/src/main/resources/org/apache/spark/ui/static/webui.css -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 04f3070..c628a0c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -16,14 +16,9 @@ */ .navbar { - height: 50px; font-size: 15px; margin-bottom: 15px; - min-width: 1200px -} - -.navbar .navbar-inner { - height: 50px; + min-width: 600px; } .navbar .brand { @@ -46,6 +41,7 @@ .navbar-text { height: 50px; line-height: 3.3; + white-space: nowrap; } table.sortable thead { http://git-wip-us.apache.org/repos/asf/spark/blob/83653ac5/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 84a1116..1e8194f 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -210,10 +210,10 @@ private[spark] object UIUtils extends Logging { {org.apache.spark.SPARK_VERSION} -{header} {shortAppName} application UI +{header} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10864][WEB UI] app name is hidden if window is resized
Repository: spark Updated Branches: refs/heads/branch-1.6 448208d0e -> 97317d346 [SPARK-10864][WEB UI] app name is hidden if window is resized Currently the Web UI navbar has a minimum width of 1200px; so if a window is resized smaller than that the app name goes off screen. The 1200px width seems to have been chosen since it fits the longest example app name without wrapping. To work with smaller window widths I made the tabs wrap since it looked better than wrapping the app name. This is a distinct change in how the navbar looks and I'm not sure if it's what we actually want to do. Other notes: - min-width set to 600px to keep the tabs from wrapping individually (will need to be adjusted if tabs are added) - app name will also wrap (making three levels) if a really really long app name is used Author: Alex Bozarth Closes #9874 from ajbozarth/spark10864. (cherry picked from commit 83653ac5e71996c5a366a42170bed316b208f1b5) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97317d34 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97317d34 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97317d34 Branch: refs/heads/branch-1.6 Commit: 97317d346e9cab6b2f33123a79f33ccd253a99fd Parents: 448208d Author: Alex Bozarth Authored: Wed Nov 25 11:39:00 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:39:11 2015 -0800 -- core/src/main/resources/org/apache/spark/ui/static/webui.css | 8 ++-- core/src/main/scala/org/apache/spark/ui/UIUtils.scala| 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97317d34/core/src/main/resources/org/apache/spark/ui/static/webui.css -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 04f3070..c628a0c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -16,14 +16,9 @@ */ .navbar { - height: 50px; font-size: 15px; margin-bottom: 15px; - min-width: 1200px -} - -.navbar .navbar-inner { - height: 50px; + min-width: 600px; } .navbar .brand { @@ -46,6 +41,7 @@ .navbar-text { height: 50px; line-height: 3.3; + white-space: nowrap; } table.sortable thead { http://git-wip-us.apache.org/repos/asf/spark/blob/97317d34/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 84a1116..1e8194f 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -210,10 +210,10 @@ private[spark] object UIUtils extends Logging { {org.apache.spark.SPARK_VERSION} -{header} {shortAppName} application UI +{header} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCUMENTATION] Fix minor doc error
Repository: spark Updated Branches: refs/heads/master 0dee44a66 -> 67b673208 [DOCUMENTATION] Fix minor doc error Author: Jeff Zhang Closes #9956 from zjffdu/dev_typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67b67320 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67b67320 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67b67320 Branch: refs/heads/master Commit: 67b67320884282ccf3102e2af96f877e9b186517 Parents: 0dee44a Author: Jeff Zhang Authored: Wed Nov 25 11:37:42 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:37:42 2015 -0800 -- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67b67320/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 4de202d..741d6b2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,7 +35,7 @@ val sc = new SparkContext(conf) {% endhighlight %} Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may -actually require one to prevent any sort of starvation issues. +actually require more than 1 thread to prevent any sort of starvation issues. Properties that specify some time duration should be configured with a unit of time. The following format is accepted: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCUMENTATION] Fix minor doc error
Repository: spark Updated Branches: refs/heads/branch-1.6 685b9c2f5 -> 448208d0e [DOCUMENTATION] Fix minor doc error Author: Jeff Zhang Closes #9956 from zjffdu/dev_typo. (cherry picked from commit 67b67320884282ccf3102e2af96f877e9b186517) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/448208d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/448208d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/448208d0 Branch: refs/heads/branch-1.6 Commit: 448208d0e86963b7f3ee0e507967512240546871 Parents: 685b9c2 Author: Jeff Zhang Authored: Wed Nov 25 11:37:42 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:38:08 2015 -0800 -- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/448208d0/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index c496146..77f1006 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,7 +35,7 @@ val sc = new SparkContext(conf) {% endhighlight %} Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may -actually require one to prevent any sort of starvation issues. +actually require more than 1 thread to prevent any sort of starvation issues. Properties that specify some time duration should be configured with a unit of time. The following format is accepted: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Remove unnecessary spaces in `include_example.rb`
Repository: spark Updated Branches: refs/heads/branch-1.6 c7db01b20 -> 685b9c2f5 [MINOR] Remove unnecessary spaces in `include_example.rb` Author: Yu ISHIKAWA Closes #9960 from yu-iskw/minor-remove-spaces. (cherry picked from commit 0dee44a6646daae0cc03dbc32125e080dff0f4ae) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/685b9c2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/685b9c2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/685b9c2f Branch: refs/heads/branch-1.6 Commit: 685b9c2f5a68cc77e9494a39b0cb25b6c0b239b3 Parents: c7db01b Author: Yu ISHIKAWA Authored: Wed Nov 25 11:35:52 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:36:06 2015 -0800 -- docs/_plugins/include_example.rb | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/685b9c2f/docs/_plugins/include_example.rb -- diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb index 549f81f..564c866 100644 --- a/docs/_plugins/include_example.rb +++ b/docs/_plugins/include_example.rb @@ -20,12 +20,12 @@ require 'pygments' module Jekyll class IncludeExampleTag < Liquid::Tag - + def initialize(tag_name, markup, tokens) @markup = markup super end - + def render(context) site = context.registers[:site] config_dir = '../examples/src/main' @@ -37,7 +37,7 @@ module Jekyll code = File.open(@file).read.encode("UTF-8") code = select_lines(code) - + rendered_code = Pygments.highlight(code, :lexer => @lang) hint = "Find full example code at " \ @@ -45,7 +45,7 @@ module Jekyll rendered_code + hint end - + # Trim the code block so as to have the same indention, regardless of their positions in the # code file. def trim_codeblock(lines) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Remove unnecessary spaces in `include_example.rb`
Repository: spark Updated Branches: refs/heads/master dc1d324fd -> 0dee44a66 [MINOR] Remove unnecessary spaces in `include_example.rb` Author: Yu ISHIKAWA Closes #9960 from yu-iskw/minor-remove-spaces. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dee44a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dee44a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dee44a6 Branch: refs/heads/master Commit: 0dee44a6646daae0cc03dbc32125e080dff0f4ae Parents: dc1d324 Author: Yu ISHIKAWA Authored: Wed Nov 25 11:35:52 2015 -0800 Committer: Andrew Or Committed: Wed Nov 25 11:35:52 2015 -0800 -- docs/_plugins/include_example.rb | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0dee44a6/docs/_plugins/include_example.rb -- diff --git a/docs/_plugins/include_example.rb b/docs/_plugins/include_example.rb index 549f81f..564c866 100644 --- a/docs/_plugins/include_example.rb +++ b/docs/_plugins/include_example.rb @@ -20,12 +20,12 @@ require 'pygments' module Jekyll class IncludeExampleTag < Liquid::Tag - + def initialize(tag_name, markup, tokens) @markup = markup super end - + def render(context) site = context.registers[:site] config_dir = '../examples/src/main' @@ -37,7 +37,7 @@ module Jekyll code = File.open(@file).read.encode("UTF-8") code = select_lines(code) - + rendered_code = Pygments.highlight(code, :lexer => @lang) hint = "Find full example code at " \ @@ -45,7 +45,7 @@ module Jekyll rendered_code + hint end - + # Trim the code block so as to have the same indention, regardless of their positions in the # code file. def trim_codeblock(lines) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark
Repository: spark Updated Branches: refs/heads/branch-1.6 d5145210b -> c7db01b20 [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark Currently, we does not have visualization for SQL query from Python, this PR fix that. cc zsxwing Author: Davies Liu Closes #9949 from davies/pyspark_sql_ui. (cherry picked from commit dc1d324fdf83e9f4b1debfb277533b002691d71f) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7db01b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7db01b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7db01b2 Branch: refs/heads/branch-1.6 Commit: c7db01b20b97352697ffb04aee869bea44d8f932 Parents: d514521 Author: Davies Liu Authored: Wed Nov 25 11:11:39 2015 -0800 Committer: Davies Liu Committed: Wed Nov 25 11:11:51 2015 -0800 -- python/pyspark/sql/dataframe.py | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 7 +++ .../scala/org/apache/spark/sql/execution/python.scala | 12 +++- 3 files changed, 15 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0dd75ba..746bb55 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -277,7 +277,7 @@ class DataFrame(object): [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] """ with SCCallSiteSync(self._sc) as css: -port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) +port = self._jdf.collectToPython() return list(_load_from_socket(port, BatchedSerializer(PickleSerializer( @ignore_unicode_prefix http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d8319b9..6197f10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.python.PythonRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ @@ -1735,6 +1736,12 @@ class DataFrame private[sql]( EvaluatePython.javaToPython(rdd) } + protected[sql] def collectToPython(): Int = { +withNewExecutionId { + PythonRDD.collectAndServe(javaToPython.rdd) +} + } + // Deprecated methods http://git-wip-us.apache.org/repos/asf/spark/blob/c7db01b2/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala index d611b00..defcec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala @@ -121,11 +121,13 @@ object EvaluatePython { def takeAndServe(df: DataFrame, n: Int): Int = { registerPicklers() -val iter = new SerDeUtil.AutoBatchedPickler( - df.queryExecution.executedPlan.executeTake(n).iterator.map { row => -EvaluatePython.toJava(row, df.schema) - }) -PythonRDD.serveIterator(iter, s"serve-DataFrame") +df.withNewExecutionId { + val iter = new SerDeUtil.AutoBatchedPickler( +df.queryExecution.executedPlan.executeTake(n).iterator.map { row => + EvaluatePython.toJava(row, df.schema) +}) + PythonRDD.serveIterator(iter, s"serve-DataFrame") +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark
Repository: spark Updated Branches: refs/heads/master 6b781576a -> dc1d324fd [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for pyspark Currently, we does not have visualization for SQL query from Python, this PR fix that. cc zsxwing Author: Davies Liu Closes #9949 from davies/pyspark_sql_ui. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc1d324f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc1d324f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc1d324f Branch: refs/heads/master Commit: dc1d324fdf83e9f4b1debfb277533b002691d71f Parents: 6b78157 Author: Davies Liu Authored: Wed Nov 25 11:11:39 2015 -0800 Committer: Davies Liu Committed: Wed Nov 25 11:11:39 2015 -0800 -- python/pyspark/sql/dataframe.py | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 7 +++ .../scala/org/apache/spark/sql/execution/python.scala | 12 +++- 3 files changed, 15 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0dd75ba..746bb55 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -277,7 +277,7 @@ class DataFrame(object): [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] """ with SCCallSiteSync(self._sc) as css: -port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) +port = self._jdf.collectToPython() return list(_load_from_socket(port, BatchedSerializer(PickleSerializer( @ignore_unicode_prefix http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d8319b9..6197f10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.python.PythonRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ @@ -1735,6 +1736,12 @@ class DataFrame private[sql]( EvaluatePython.javaToPython(rdd) } + protected[sql] def collectToPython(): Int = { +withNewExecutionId { + PythonRDD.collectAndServe(javaToPython.rdd) +} + } + // Deprecated methods http://git-wip-us.apache.org/repos/asf/spark/blob/dc1d324f/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala index d611b00..defcec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala @@ -121,11 +121,13 @@ object EvaluatePython { def takeAndServe(df: DataFrame, n: Int): Int = { registerPicklers() -val iter = new SerDeUtil.AutoBatchedPickler( - df.queryExecution.executedPlan.executeTake(n).iterator.map { row => -EvaluatePython.toJava(row, df.schema) - }) -PythonRDD.serveIterator(iter, s"serve-DataFrame") +df.withNewExecutionId { + val iter = new SerDeUtil.AutoBatchedPickler( +df.queryExecution.executedPlan.executeTake(n).iterator.map { row => + EvaluatePython.toJava(row, df.schema) +}) + PythonRDD.serveIterator(iter, s"serve-DataFrame") +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits
Repository: spark Updated Branches: refs/heads/branch-1.6 204f3601d -> d5145210b [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits deleting the temp dir like that ``` scala> import scala.collection.mutable import scala.collection.mutable scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9) a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98) scala> a.foreach(x => {a.remove(x) }) scala> a.foreach(println(_)) 98 ``` You may not modify a collection while traversing or iterating over it.This can not delete all element of the collection Author: Zhongshuai Pei Closes #9951 from DoingDone9/Bug_RemainDir. (cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5145210 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5145210 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5145210 Branch: refs/heads/branch-1.6 Commit: d5145210bd59072a33b61f15348d5e794f6df4e0 Parents: 204f360 Author: Zhongshuai Pei Authored: Wed Nov 25 10:37:34 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:37:42 2015 -0800 -- .../main/scala/org/apache/spark/util/ShutdownHookManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5145210/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index db4a8b3..4012dca 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging { // Add a shutdown hook to delete the temp dirs when the JVM exits addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => logInfo("Shutdown hook called") -shutdownDeletePaths.foreach { dirPath => +// we need to materialize the paths to delete because deleteRecursively removes items from +// shutdownDeletePaths as we are traversing through it. +shutdownDeletePaths.toArray.foreach { dirPath => try { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits
Repository: spark Updated Branches: refs/heads/branch-1.4 94789f374 -> 1df3e8230 [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits deleting the temp dir like that ``` scala> import scala.collection.mutable import scala.collection.mutable scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9) a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98) scala> a.foreach(x => {a.remove(x) }) scala> a.foreach(println(_)) 98 ``` You may not modify a collection while traversing or iterating over it.This can not delete all element of the collection Author: Zhongshuai Pei Closes #9951 from DoingDone9/Bug_RemainDir. (cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1df3e823 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1df3e823 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1df3e823 Branch: refs/heads/branch-1.4 Commit: 1df3e823028c0f1e22e2e61f959ef0343355a757 Parents: 94789f3 Author: Zhongshuai Pei Authored: Wed Nov 25 10:37:34 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:38:02 2015 -0800 -- .../main/scala/org/apache/spark/util/ShutdownHookManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1df3e823/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 61ff9b8..091dc03 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging { // Add a shutdown hook to delete the temp dirs when the JVM exits addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => logInfo("Shutdown hook called") -shutdownDeletePaths.foreach { dirPath => +// we need to materialize the paths to delete because deleteRecursively removes items from +// shutdownDeletePaths as we are traversing through it. +shutdownDeletePaths.toArray.foreach { dirPath => try { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits
Repository: spark Updated Branches: refs/heads/branch-1.5 4139a4ed1 -> b1fcefca6 [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits deleting the temp dir like that ``` scala> import scala.collection.mutable import scala.collection.mutable scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9) a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98) scala> a.foreach(x => {a.remove(x) }) scala> a.foreach(println(_)) 98 ``` You may not modify a collection while traversing or iterating over it.This can not delete all element of the collection Author: Zhongshuai Pei Closes #9951 from DoingDone9/Bug_RemainDir. (cherry picked from commit 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1fcefca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1fcefca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1fcefca Branch: refs/heads/branch-1.5 Commit: b1fcefca6b257f5bb1be85fce2b4a97d4271e0aa Parents: 4139a4e Author: Zhongshuai Pei Authored: Wed Nov 25 10:37:34 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:37:51 2015 -0800 -- .../main/scala/org/apache/spark/util/ShutdownHookManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1fcefca/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 61ff9b8..091dc03 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging { // Add a shutdown hook to delete the temp dirs when the JVM exits addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => logInfo("Shutdown hook called") -shutdownDeletePaths.foreach { dirPath => +// we need to materialize the paths to delete because deleteRecursively removes items from +// shutdownDeletePaths as we are traversing through it. +shutdownDeletePaths.toArray.foreach { dirPath => try { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits
Repository: spark Updated Branches: refs/heads/master faabdfa2b -> 6b781576a [SPARK-11974][CORE] Not all the temp dirs had been deleted when the JVM exits deleting the temp dir like that ``` scala> import scala.collection.mutable import scala.collection.mutable scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9) a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98) scala> a.foreach(x => {a.remove(x) }) scala> a.foreach(println(_)) 98 ``` You may not modify a collection while traversing or iterating over it.This can not delete all element of the collection Author: Zhongshuai Pei Closes #9951 from DoingDone9/Bug_RemainDir. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b781576 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b781576 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b781576 Branch: refs/heads/master Commit: 6b781576a15d8d5c5fbed8bef1c5bda95b3d44ac Parents: faabdfa Author: Zhongshuai Pei Authored: Wed Nov 25 10:37:34 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:37:34 2015 -0800 -- .../main/scala/org/apache/spark/util/ShutdownHookManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b781576/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index db4a8b3..4012dca 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging { // Add a shutdown hook to delete the temp dirs when the JVM exits addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => logInfo("Shutdown hook called") -shutdownDeletePaths.foreach { dirPath => +// we need to materialize the paths to delete because deleteRecursively removes items from +// shutdownDeletePaths as we are traversing through it. +shutdownDeletePaths.toArray.foreach { dirPath => try { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python
Repository: spark Updated Branches: refs/heads/master c1f85fc71 -> faabdfa2b [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python Author: felixcheung Closes #9967 from felixcheung/pypivotdoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/faabdfa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/faabdfa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/faabdfa2 Branch: refs/heads/master Commit: faabdfa2bd416ae514961535f1953e8e9e8b1f3f Parents: c1f85fc Author: felixcheung Authored: Wed Nov 25 10:36:35 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:36:35 2015 -0800 -- python/pyspark/sql/group.py| 6 +++--- sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/faabdfa2/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index d8ed7eb..1911588 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -169,11 +169,11 @@ class GroupedData(object): @since(1.6) def pivot(self, pivot_col, values=None): -"""Pivots a column of the current DataFrame and preform the specified aggregation. +"""Pivots a column of the current DataFrame and perform the specified aggregation. :param pivot_col: Column to pivot -:param values: Optional list of values of pivotColumn that will be translated to columns in -the output data frame. If values are not provided the method with do an immediate call +:param values: Optional list of values of pivot column that will be translated to columns in +the output DataFrame. If values are not provided the method will do an immediate call to .distinct() on the pivot column. >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/faabdfa2/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index abd531c..13341a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -282,7 +282,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -321,7 +321,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -353,7 +353,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python
Repository: spark Updated Branches: refs/heads/branch-1.6 400f66f7c -> 204f3601d [SPARK-11984][SQL][PYTHON] Fix typos in doc for pivot for scala and python Author: felixcheung Closes #9967 from felixcheung/pypivotdoc. (cherry picked from commit faabdfa2bd416ae514961535f1953e8e9e8b1f3f) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/204f3601 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/204f3601 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/204f3601 Branch: refs/heads/branch-1.6 Commit: 204f3601d81502fcd879cfd23376475443eedf30 Parents: 400f66f Author: felixcheung Authored: Wed Nov 25 10:36:35 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 10:36:59 2015 -0800 -- python/pyspark/sql/group.py| 6 +++--- sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/204f3601/python/pyspark/sql/group.py -- diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index d8ed7eb..1911588 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -169,11 +169,11 @@ class GroupedData(object): @since(1.6) def pivot(self, pivot_col, values=None): -"""Pivots a column of the current DataFrame and preform the specified aggregation. +"""Pivots a column of the current DataFrame and perform the specified aggregation. :param pivot_col: Column to pivot -:param values: Optional list of values of pivotColumn that will be translated to columns in -the output data frame. If values are not provided the method with do an immediate call +:param values: Optional list of values of pivot column that will be translated to columns in +the output DataFrame. If values are not provided the method will do an immediate call to .distinct() on the pivot column. >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/204f3601/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index abd531c..13341a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -282,7 +282,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -321,7 +321,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. @@ -353,7 +353,7 @@ class GroupedData protected[sql]( } /** - * Pivots a column of the current [[DataFrame]] and preform the specified aggregation. + * Pivots a column of the current [[DataFrame]] and perform the specified aggregation. * There are two versions of pivot function: one that requires the caller to specify the list * of distinct values to pivot on, and one that does not. The latter is more concise but less * efficient, because Spark needs to first compute the list of distinct values internally. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer. - NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin Closes #9941 from vanzin/SPARK-11956. (cherry picked from commit c1f85fc71e71e07534b89c84677d977bb20994f8) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400f66f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400f66f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400f66f7 Branch: refs/heads/branch-1.6 Commit: 400f66f7c43f608444fb87505e1a789879928360 Parents: c8f26d2 Author: Marcelo Vanzin Authored: Wed Nov 25 09:47:20 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 10:00:50 2015 -0800 -- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 19 + .../spark/rpc/netty/NettyStreamManager.scala| 2 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 27 ++- .../client/TransportResponseHandler.java| 28 +--- .../org/apache/spark/network/StreamSuite.java | 23 +++- 5 files changed, 75 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7495f3e..98ce3d6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -27,7 +27,7 @@ import javax.annotation.Nullable import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import scala.util.{DynamicVariable, Failure, Success} +import scala.util.{DynamicVariable, Failure, Success, Try} import scala.util.control.NonFatal import org.apache.spark.{Logging, HttpFileServer, SecurityManager, SparkConf} @@ -375,13 +375,22 @@ private[netty] class NettyRpcEnv( @volatile private var error: Throwable = _ -def setError(e: Throwable): Unit = error = e +def setError(e: Throwable): Unit = { + error = e + source.close() +} override def read(dst: ByteBuffer): Int = { - if (error != null) { -throw error + val result = if (error == null) { +Try(source.read(dst)) + } else { +Failure(error) + } + + result match { +case Success(bytesRead) => bytesRead +case Failure(error) => throw error } - source.read(dst) } override def close(): Unit = source.close() http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index eb1d260..a2768b4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype") } -require(file != null, s"File not found: $streamId") +require(file != null && file.isFile(), s"File not found: $streamId") new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) } http://git-wip-us.apache.org/repos/asf/spark/blob/400f66f7/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 2b664c6..6cc958a 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val tempDir = Utils.createTempDir() val file = new File(tempDir, "file") Files.write(UUID.randomUUID().toString(), file, UTF_8) +val empty = new File(tempDir, "empty") +Files.write("", empty, UTF_8); val jar = new File(tempDir, "jar") Files.write(UUID.randomUUID().toString(), jar, UTF_8) val fileUri = env.fileServer.addFile(file) +val emptyUri = env.fileServer.addFile(empty) val jarUri = env.fileServer.addJa
[1/2] spark git commit: [SPARK-11762][NETWORK] Account for active streams when couting outstanding requests.
Repository: spark Updated Branches: refs/heads/branch-1.6 2aeee5696 -> 400f66f7c [SPARK-11762][NETWORK] Account for active streams when couting outstanding requests. This way the timeout handling code can correctly close "hung" channels that are processing streams. Author: Marcelo Vanzin Closes #9747 from vanzin/SPARK-11762. (cherry picked from commit 5231cd5acaae69d735ba3209531705cc222f3cfb) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8f26d2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8f26d2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8f26d2e Branch: refs/heads/branch-1.6 Commit: c8f26d2e818f935add8b34ef58de7c7cac2a60af Parents: 2aeee56 Author: Marcelo Vanzin Authored: Mon Nov 23 10:45:23 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 10:00:45 2015 -0800 -- .../spark/network/client/StreamInterceptor.java | 12 - .../client/TransportResponseHandler.java| 15 +-- .../network/TransportResponseHandlerSuite.java | 27 3 files changed, 51 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8f26d2e/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java index 02230a0..88ba3cc 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java +++ b/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java @@ -30,13 +30,19 @@ import org.apache.spark.network.util.TransportFrameDecoder; */ class StreamInterceptor implements TransportFrameDecoder.Interceptor { + private final TransportResponseHandler handler; private final String streamId; private final long byteCount; private final StreamCallback callback; private volatile long bytesRead; - StreamInterceptor(String streamId, long byteCount, StreamCallback callback) { + StreamInterceptor( + TransportResponseHandler handler, + String streamId, + long byteCount, + StreamCallback callback) { +this.handler = handler; this.streamId = streamId; this.byteCount = byteCount; this.callback = callback; @@ -45,11 +51,13 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor { @Override public void exceptionCaught(Throwable cause) throws Exception { +handler.deactivateStream(); callback.onFailure(streamId, cause); } @Override public void channelInactive() throws Exception { +handler.deactivateStream(); callback.onFailure(streamId, new ClosedChannelException()); } @@ -65,8 +73,10 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor { RuntimeException re = new IllegalStateException(String.format( "Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead)); callback.onFailure(streamId, re); + handler.deactivateStream(); throw re; } else if (bytesRead == byteCount) { + handler.deactivateStream(); callback.onComplete(streamId); } http://git-wip-us.apache.org/repos/asf/spark/blob/c8f26d2e/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index fc7bdde..be181e0 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public class TransportResponseHandler extends MessageHandler { private final Map outstandingRpcs; private final Queue streamCallbacks; + private volatile boolean streamActive; /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */ private final AtomicLong timeOfLastRequestNs; @@ -87,9 +89,15 @@ public class TransportResponseHandler extends MessageHandler { } public void addStreamCallback(StreamCallback callback) { +timeOfLastRequestNs.s
spark git commit: [SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
Repository: spark Updated Branches: refs/heads/master 0a5aef753 -> c1f85fc71 [SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer. - NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin Closes #9941 from vanzin/SPARK-11956. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1f85fc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1f85fc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1f85fc7 Branch: refs/heads/master Commit: c1f85fc71e71e07534b89c84677d977bb20994f8 Parents: 0a5aef7 Author: Marcelo Vanzin Authored: Wed Nov 25 09:47:20 2015 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 25 09:47:20 2015 -0800 -- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 19 + .../spark/rpc/netty/NettyStreamManager.scala| 2 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 27 ++- .../client/TransportResponseHandler.java| 28 +--- .../org/apache/spark/network/StreamSuite.java | 23 +++- 5 files changed, 75 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 68701f6..c8fa870 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -27,7 +27,7 @@ import javax.annotation.Nullable import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import scala.util.{DynamicVariable, Failure, Success} +import scala.util.{DynamicVariable, Failure, Success, Try} import scala.util.control.NonFatal import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -368,13 +368,22 @@ private[netty] class NettyRpcEnv( @volatile private var error: Throwable = _ -def setError(e: Throwable): Unit = error = e +def setError(e: Throwable): Unit = { + error = e + source.close() +} override def read(dst: ByteBuffer): Int = { - if (error != null) { -throw error + val result = if (error == null) { +Try(source.read(dst)) + } else { +Failure(error) + } + + result match { +case Success(bytesRead) => bytesRead +case Failure(error) => throw error } - source.read(dst) } override def close(): Unit = source.close() http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index eb1d260..a2768b4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype") } -require(file != null, s"File not found: $streamId") +require(file != null && file.isFile(), s"File not found: $streamId") new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) } http://git-wip-us.apache.org/repos/asf/spark/blob/c1f85fc7/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 2b664c6..6cc958a 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val tempDir = Utils.createTempDir() val file = new File(tempDir, "file") Files.write(UUID.randomUUID().toString(), file, UTF_8) +val empty = new File(tempDir, "empty") +Files.write("", empty, UTF_8); val jar = new File(tempDir, "jar") Files.write(UUID.randomUUID().toString(), jar, UTF_8) val fileUri = env.fileServer.addFile(file) +val emptyUri = env.fileServer.addFile(empty) val jarUri = env.fileServer.addJar(jar)
spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage
Repository: spark Updated Branches: refs/heads/branch-1.5 27b5f31a0 -> 4139a4ed1 [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. kayousterhout pankajarora12 Author: Mark Hamstra Author: Imran Rashid Closes #6291 from markhamstra/SPARK-6880. (cherry picked from commit 0a5aef753e70e93d7e56054f354a52e4d4e18932) Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4139a4ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4139a4ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4139a4ed Branch: refs/heads/branch-1.5 Commit: 4139a4ed1f024b1a88e4efe613acf45175f82318 Parents: 27b5f31 Author: Mark Hamstra Authored: Wed Nov 25 09:34:34 2015 -0600 Committer: Imran Rashid Committed: Wed Nov 25 09:38:34 2015 -0600 -- .../apache/spark/scheduler/DAGScheduler.scala | 6 +- .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++- 2 files changed, 109 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4139a4ed/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb2494d..c8f996c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -810,7 +810,9 @@ class DAGScheduler( stage.resetInternalAccumulators() } -val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull +// Use the scheduling pool, job group, description, etc. from an ActiveJob associated +// with this Stage +val properties = jobIdToActiveJob(jobId).properties runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are @@ -905,7 +907,7 @@ class DAGScheduler( stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) taskScheduler.submitTasks(new TaskSet( -tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) +tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark http://git-wip-us.apache.org/repos/asf/spark/blob/4139a4ed/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7232970..88745f2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Properties + import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -234,9 +236,10 @@ class DAGSchedulerSuite rdd: RDD[_], partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, - listener: JobListener = jobListener): Int = { + listener: JobListener = jobListener, + properties: Properties = null): Int = { val jobId = scheduler.nextJobId.getAndIncrement() -runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener)) +runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) jobId } @@ -750,6 +753,106 @@ class DAGSchedulerSuite assertDataStructuresEmpty() }
spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage
Repository: spark Updated Branches: refs/heads/branch-1.6 4971eaaa5 -> 2aeee5696 [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. kayousterhout pankajarora12 Author: Mark Hamstra Author: Imran Rashid Closes #6291 from markhamstra/SPARK-6880. (cherry picked from commit 0a5aef753e70e93d7e56054f354a52e4d4e18932) Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aeee569 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aeee569 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aeee569 Branch: refs/heads/branch-1.6 Commit: 2aeee56961b27984fa43253bd63ac94b06162bfb Parents: 4971eaa Author: Mark Hamstra Authored: Wed Nov 25 09:34:34 2015 -0600 Committer: Imran Rashid Committed: Wed Nov 25 09:34:56 2015 -0600 -- .../apache/spark/scheduler/DAGScheduler.scala | 6 +- .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++- 2 files changed, 109 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2aeee569/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 77a184d..e01a960 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -946,7 +946,9 @@ class DAGScheduler( stage.resetInternalAccumulators() } -val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull +// Use the scheduling pool, job group, description, etc. from an ActiveJob associated +// with this Stage +val properties = jobIdToActiveJob(jobId).properties runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are @@ -1047,7 +1049,7 @@ class DAGScheduler( stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( -tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) +tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark http://git-wip-us.apache.org/repos/asf/spark/blob/2aeee569/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4d6b254..653d41f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Properties + import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -262,9 +264,10 @@ class DAGSchedulerSuite rdd: RDD[_], partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, - listener: JobListener = jobListener): Int = { + listener: JobListener = jobListener, + properties: Properties = null): Int = { val jobId = scheduler.nextJobId.getAndIncrement() -runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener)) +runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) jobId } @@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
spark git commit: [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage
Repository: spark Updated Branches: refs/heads/master b9b6fbe89 -> 0a5aef753 [SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. kayousterhout pankajarora12 Author: Mark Hamstra Author: Imran Rashid Closes #6291 from markhamstra/SPARK-6880. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a5aef75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a5aef75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a5aef75 Branch: refs/heads/master Commit: 0a5aef753e70e93d7e56054f354a52e4d4e18932 Parents: b9b6fbe Author: Mark Hamstra Authored: Wed Nov 25 09:34:34 2015 -0600 Committer: Imran Rashid Committed: Wed Nov 25 09:34:34 2015 -0600 -- .../apache/spark/scheduler/DAGScheduler.scala | 6 +- .../spark/scheduler/DAGSchedulerSuite.scala | 107 ++- 2 files changed, 109 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 77a184d..e01a960 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -946,7 +946,9 @@ class DAGScheduler( stage.resetInternalAccumulators() } -val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull +// Use the scheduling pool, job group, description, etc. from an ActiveJob associated +// with this Stage +val properties = jobIdToActiveJob(jobId).properties runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are @@ -1047,7 +1049,7 @@ class DAGScheduler( stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( -tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) +tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4d6b254..653d41f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Properties + import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -262,9 +264,10 @@ class DAGSchedulerSuite rdd: RDD[_], partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, - listener: JobListener = jobListener): Int = { + listener: JobListener = jobListener, + properties: Properties = null): Int = { val jobId = scheduler.nextJobId.getAndIncrement() -runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener)) +runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) jobId } @@ -1322,6 +1325,106 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: Str
spark git commit: [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …
Repository: spark Updated Branches: refs/heads/master 638500265 -> b9b6fbe89 [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification ⦠â¦for registerFunction [Python] Straightforward change on the python doc Author: Jeff Zhang Closes #9901 from zjffdu/SPARK-11860. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9b6fbe8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9b6fbe8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9b6fbe8 Branch: refs/heads/master Commit: b9b6fbe89b6d1a890faa02c1a53bb670a6255362 Parents: 6385002 Author: Jeff Zhang Authored: Wed Nov 25 13:49:58 2015 + Committer: Sean Owen Committed: Wed Nov 25 13:49:58 2015 + -- python/pyspark/sql/context.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9b6fbe8/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 5a85ac3..a49c1b5 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -195,14 +195,15 @@ class SQLContext(object): @ignore_unicode_prefix @since(1.2) def registerFunction(self, name, f, returnType=StringType()): -"""Registers a lambda function as a UDF so it can be used in SQL statements. +"""Registers a python function (including lambda function) as a UDF +so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF -:param samplingRatio: lambda function +:param f: python function :param returnType: a :class:`DataType` object >>> sqlContext.registerFunction("stringLengthString", lambda x: len(x)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification …
Repository: spark Updated Branches: refs/heads/branch-1.6 a986a3bde -> 4971eaaa5 [SPARK-11860][PYSAPRK][DOCUMENTATION] Invalid argument specification ⦠â¦for registerFunction [Python] Straightforward change on the python doc Author: Jeff Zhang Closes #9901 from zjffdu/SPARK-11860. (cherry picked from commit b9b6fbe89b6d1a890faa02c1a53bb670a6255362) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4971eaaa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4971eaaa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4971eaaa Branch: refs/heads/branch-1.6 Commit: 4971eaaa5768ad20c5a77278e435c97409a9ca8f Parents: a986a3b Author: Jeff Zhang Authored: Wed Nov 25 13:49:58 2015 + Committer: Sean Owen Committed: Wed Nov 25 13:50:10 2015 + -- python/pyspark/sql/context.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4971eaaa/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 5a85ac3..a49c1b5 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -195,14 +195,15 @@ class SQLContext(object): @ignore_unicode_prefix @since(1.2) def registerFunction(self, name, f, returnType=StringType()): -"""Registers a lambda function as a UDF so it can be used in SQL statements. +"""Registers a python function (including lambda function) as a UDF +so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF -:param samplingRatio: lambda function +:param f: python function :param returnType: a :class:`DataType` object >>> sqlContext.registerFunction("stringLengthString", lambda x: len(x)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set
Repository: spark Updated Branches: refs/heads/branch-1.6 997896643 -> a986a3bde [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set Changed the log type to a 'warning' instead of 'info' as required. Author: Ashwin Swaroop Closes #9926 from ashwinswaroop/master. (cherry picked from commit 63850026576b3ea7783f9d4b975171dc3cff6e4c) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a986a3bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a986a3bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a986a3bd Branch: refs/heads/branch-1.6 Commit: a986a3bde7426fbbc6b95848dcc55f1d6f22a1b1 Parents: 9978966 Author: Ashwin Swaroop Authored: Wed Nov 25 13:41:14 2015 + Committer: Sean Owen Committed: Wed Nov 25 13:41:26 2015 + -- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a986a3bd/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e19ba11..2c10779 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -556,7 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set
Repository: spark Updated Branches: refs/heads/master a0f1a1183 -> 638500265 [SPARK-11686][CORE] Issue WARN when dynamic allocation is disabled due to spark.dynamicAllocation.enabled and spark.executor.instances both set Changed the log type to a 'warning' instead of 'info' as required. Author: Ashwin Swaroop Closes #9926 from ashwinswaroop/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63850026 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63850026 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63850026 Branch: refs/heads/master Commit: 63850026576b3ea7783f9d4b975171dc3cff6e4c Parents: a0f1a11 Author: Ashwin Swaroop Authored: Wed Nov 25 13:41:14 2015 + Committer: Sean Owen Committed: Wed Nov 25 13:41:14 2015 + -- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63850026/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e19ba11..2c10779 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -556,7 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable
Repository: spark Updated Branches: refs/heads/master 2610e0612 -> a0f1a1183 [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable Also added show methods to Dataset. Author: Reynold Xin Closes #9964 from rxin/SPARK-11981. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0f1a118 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0f1a118 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0f1a118 Branch: refs/heads/master Commit: a0f1a11837bfffb76582499d36fbaf21a1d628cb Parents: 2610e06 Author: Reynold Xin Authored: Wed Nov 25 01:03:18 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 01:03:18 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 35 - .../scala/org/apache/spark/sql/Dataset.scala| 77 +++- .../apache/spark/sql/execution/Queryable.scala | 32 ++-- 3 files changed, 111 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0f1a118/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5eca1db..d8319b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} -import org.apache.spark.sql.execution.{EvaluatePython, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} +import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.sources.HadoopFsRelation @@ -112,8 +112,8 @@ private[sql] object DataFrame { */ @Experimental class DataFrame private[sql]( -@transient val sqlContext: SQLContext, -@DeveloperApi @transient val queryExecution: QueryExecution) +@transient override val sqlContext: SQLContext, +@DeveloperApi @transient override val queryExecution: QueryExecution) extends Queryable with Serializable { // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure @@ -283,6 +283,35 @@ class DataFrame private[sql]( def schema: StructType = queryExecution.analyzed.schema /** + * Prints the schema to the console in a nice tree format. + * @group basic + * @since 1.3.0 + */ + // scalastyle:off println + override def printSchema(): Unit = println(schema.treeString) + // scalastyle:on println + + /** + * Prints the plans (logical and physical) to the console for debugging purposes. + * @group basic + * @since 1.3.0 + */ + override def explain(extended: Boolean): Unit = { +val explain = ExplainCommand(queryExecution.logical, extended = extended) +sqlContext.executePlan(explain).executedPlan.executeCollect().foreach { + // scalastyle:off println + r => println(r.getString(0)) + // scalastyle:on println +} + } + + /** + * Prints the physical plan to the console for debugging purposes. + * @since 1.3.0 + */ + override def explain(): Unit = explain(extended = false) + + /** * Returns all column names and their data types as an array. * @group basic * @since 1.3.0 http://git-wip-us.apache.org/repos/asf/spark/blob/a0f1a118/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 97eb5b9..da46001 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,8 +61,8 @@ import org.apache.spark.util.Utils */ @Experimental class Dataset[T] private[sql]( -@transient val sqlContext: SQLContext, -@transient val queryExecution: QueryExecution, +@transient override val sqlContext: SQLContext, +@transient override val queryExecution: QueryExecution, tEncoder: Encoder[T]) extends Queryable with Serializable { /** @@ -85,7 +85,25 @@ class Dataset[T] private[sql]( * Returns the schema of the encoded form of the objects in this [[Dataset]]. * @since
spark git commit: [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable
Repository: spark Updated Branches: refs/heads/branch-1.6 007eb4ac0 -> 997896643 [SPARK-11981][SQL] Move implementations of methods back to DataFrame from Queryable Also added show methods to Dataset. Author: Reynold Xin Closes #9964 from rxin/SPARK-11981. (cherry picked from commit a0f1a11837bfffb76582499d36fbaf21a1d628cb) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99789664 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99789664 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99789664 Branch: refs/heads/branch-1.6 Commit: 9978966438f05170cb6fdb8d304363015486363c Parents: 007eb4a Author: Reynold Xin Authored: Wed Nov 25 01:03:18 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 01:03:25 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 35 - .../scala/org/apache/spark/sql/Dataset.scala| 77 +++- .../apache/spark/sql/execution/Queryable.scala | 32 ++-- 3 files changed, 111 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99789664/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5eca1db..d8319b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} -import org.apache.spark.sql.execution.{EvaluatePython, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} +import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.sources.HadoopFsRelation @@ -112,8 +112,8 @@ private[sql] object DataFrame { */ @Experimental class DataFrame private[sql]( -@transient val sqlContext: SQLContext, -@DeveloperApi @transient val queryExecution: QueryExecution) +@transient override val sqlContext: SQLContext, +@DeveloperApi @transient override val queryExecution: QueryExecution) extends Queryable with Serializable { // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure @@ -283,6 +283,35 @@ class DataFrame private[sql]( def schema: StructType = queryExecution.analyzed.schema /** + * Prints the schema to the console in a nice tree format. + * @group basic + * @since 1.3.0 + */ + // scalastyle:off println + override def printSchema(): Unit = println(schema.treeString) + // scalastyle:on println + + /** + * Prints the plans (logical and physical) to the console for debugging purposes. + * @group basic + * @since 1.3.0 + */ + override def explain(extended: Boolean): Unit = { +val explain = ExplainCommand(queryExecution.logical, extended = extended) +sqlContext.executePlan(explain).executedPlan.executeCollect().foreach { + // scalastyle:off println + r => println(r.getString(0)) + // scalastyle:on println +} + } + + /** + * Prints the physical plan to the console for debugging purposes. + * @since 1.3.0 + */ + override def explain(): Unit = explain(extended = false) + + /** * Returns all column names and their data types as an array. * @group basic * @since 1.3.0 http://git-wip-us.apache.org/repos/asf/spark/blob/99789664/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 97eb5b9..da46001 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,8 +61,8 @@ import org.apache.spark.util.Utils */ @Experimental class Dataset[T] private[sql]( -@transient val sqlContext: SQLContext, -@transient val queryExecution: QueryExecution, +@transient override val sqlContext: SQLContext, +@transient override val queryExecution: QueryExecution, tEncoder: Encoder[T]) extends Queryable with Serializable { /** @@ -85,7 +85,25 @@ class Dataset[T] p
spark git commit: [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API
Repository: spark Updated Branches: refs/heads/branch-1.6 7f030aa42 -> 007eb4ac0 [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API Except inner join, maybe the other join types are also useful when users are using the joinWith function. Thus, added the joinType into the existing joinWith call in Dataset APIs. Also providing another joinWith interface for the cartesian-join-like functionality. Please provide your opinions. marmbrus rxin cloud-fan Thank you! Author: gatorsmile Closes #9921 from gatorsmile/joinWith. (cherry picked from commit 2610e06124c7fc0b2b1cfb2e3050a35ab492fb71) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007eb4ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007eb4ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007eb4ac Branch: refs/heads/branch-1.6 Commit: 007eb4ac0a0e8ae68126e7815dd14d23953c207e Parents: 7f030aa Author: gatorsmile Authored: Wed Nov 25 01:02:36 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 01:02:42 2015 -0800 -- .../scala/org/apache/spark/sql/Dataset.scala| 45 .../org/apache/spark/sql/DatasetSuite.scala | 36 +--- 2 files changed, 65 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/007eb4ac/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index dd84b8b..97eb5b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental -import org.apache.spark.rdd.RDD import org.apache.spark.api.java.function._ - +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias -import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * :: Experimental :: @@ -83,7 +83,6 @@ class Dataset[T] private[sql]( /** * Returns the schema of the encoded form of the objects in this [[Dataset]]. - * * @since 1.6.0 */ def schema: StructType = resolvedTEncoder.schema @@ -185,7 +184,6 @@ class Dataset[T] private[sql]( * .transform(featurize) * .transform(...) * }}} - * * @since 1.6.0 */ def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this) @@ -453,6 +451,21 @@ class Dataset[T] private[sql]( c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] = selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]] + /** + * Returns a new [[Dataset]] by sampling a fraction of records. + * @since 1.6.0 + */ + def sample(withReplacement: Boolean, fraction: Double, seed: Long) : Dataset[T] = +withPlan(Sample(0.0, fraction, withReplacement, seed, _)) + + /** + * Returns a new [[Dataset]] by sampling a fraction of records, using a random seed. + * @since 1.6.0 + */ + def sample(withReplacement: Boolean, fraction: Double) : Dataset[T] = { +sample(withReplacement, fraction, Utils.random.nextLong) + } + /* * * Set operations * * */ @@ -511,13 +524,17 @@ class Dataset[T] private[sql]( * types as well as working with relational data where either side of the join has column * names in common. * + * @param other Right side of the join. + * @param condition Join expression. + * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @since 1.6.0 */ - def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { + def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { val left = this.logicalPlan val right = other.logicalPlan -val joined = sqlContext.executePlan(Join(left, right, Inner, Some(condition.expr))) +val joined = sqlContext.executePlan(Join(left, right, joinType = + JoinType(joinType), Some(condition.expr))) val leftOutput = joined.analyzed.output.take(left.output.length) val rightOutput = joined.analyzed.output.takeRight(right.output.length) @@ -540,6 +557,18 @@ class Dataset[T] private[sql](
spark git commit: [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API
Repository: spark Updated Branches: refs/heads/master 216988688 -> 2610e0612 [SPARK-11970][SQL] Adding JoinType into JoinWith and support Sample in Dataset API Except inner join, maybe the other join types are also useful when users are using the joinWith function. Thus, added the joinType into the existing joinWith call in Dataset APIs. Also providing another joinWith interface for the cartesian-join-like functionality. Please provide your opinions. marmbrus rxin cloud-fan Thank you! Author: gatorsmile Closes #9921 from gatorsmile/joinWith. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2610e061 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2610e061 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2610e061 Branch: refs/heads/master Commit: 2610e06124c7fc0b2b1cfb2e3050a35ab492fb71 Parents: 2169886 Author: gatorsmile Authored: Wed Nov 25 01:02:36 2015 -0800 Committer: Reynold Xin Committed: Wed Nov 25 01:02:36 2015 -0800 -- .../scala/org/apache/spark/sql/Dataset.scala| 45 .../org/apache/spark/sql/DatasetSuite.scala | 36 +--- 2 files changed, 65 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2610e061/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index dd84b8b..97eb5b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental -import org.apache.spark.rdd.RDD import org.apache.spark.api.java.function._ - +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias -import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * :: Experimental :: @@ -83,7 +83,6 @@ class Dataset[T] private[sql]( /** * Returns the schema of the encoded form of the objects in this [[Dataset]]. - * * @since 1.6.0 */ def schema: StructType = resolvedTEncoder.schema @@ -185,7 +184,6 @@ class Dataset[T] private[sql]( * .transform(featurize) * .transform(...) * }}} - * * @since 1.6.0 */ def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this) @@ -453,6 +451,21 @@ class Dataset[T] private[sql]( c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] = selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]] + /** + * Returns a new [[Dataset]] by sampling a fraction of records. + * @since 1.6.0 + */ + def sample(withReplacement: Boolean, fraction: Double, seed: Long) : Dataset[T] = +withPlan(Sample(0.0, fraction, withReplacement, seed, _)) + + /** + * Returns a new [[Dataset]] by sampling a fraction of records, using a random seed. + * @since 1.6.0 + */ + def sample(withReplacement: Boolean, fraction: Double) : Dataset[T] = { +sample(withReplacement, fraction, Utils.random.nextLong) + } + /* * * Set operations * * */ @@ -511,13 +524,17 @@ class Dataset[T] private[sql]( * types as well as working with relational data where either side of the join has column * names in common. * + * @param other Right side of the join. + * @param condition Join expression. + * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @since 1.6.0 */ - def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { + def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { val left = this.logicalPlan val right = other.logicalPlan -val joined = sqlContext.executePlan(Join(left, right, Inner, Some(condition.expr))) +val joined = sqlContext.executePlan(Join(left, right, joinType = + JoinType(joinType), Some(condition.expr))) val leftOutput = joined.analyzed.output.take(left.output.length) val rightOutput = joined.analyzed.output.takeRight(right.output.length) @@ -540,6 +557,18 @@ class Dataset[T] private[sql]( } } + /** + * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for eac