spark git commit: [SPARK-3722][Docs]minor improvement and fix in docs

2014-11-14 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 825709a0b - e421072da


[SPARK-3722][Docs]minor improvement and fix in docs

https://issues.apache.org/jira/browse/SPARK-3722

Author: WangTao barneystin...@aliyun.com

Closes #2579 from WangTaoTheTonic/docsWork and squashes the following commits:

6f91cec [WangTao] use more wording express
29d22fa [WangTao] delete the specified version link
34cb4ea [WangTao] Update running-on-yarn.md
4ee1a26 [WangTao] minor improvement and fix in docs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e421072d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e421072d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e421072d

Branch: refs/heads/master
Commit: e421072da0ea87e7056cc3f2130ddaafc731530f
Parents: 825709a
Author: WangTao barneystin...@aliyun.com
Authored: Fri Nov 14 08:09:42 2014 -0600
Committer: Thomas Graves tgra...@apache.org
Committed: Fri Nov 14 08:09:42 2014 -0600

--
 docs/configuration.md   | 2 +-
 docs/running-on-yarn.md | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e421072d/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index f0b396e..8839162 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -52,7 +52,7 @@ Then, you can supply configuration values at runtime:
   --conf spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps myApp.jar 
 {% endhighlight %}
 
-The Spark shell and 
[`spark-submit`](cluster-overview.html#launching-applications-with-spark-submit)
+The Spark shell and [`spark-submit`](submitting-applications.html)
 tool support two ways to load configurations dynamically. The first are 
command line options,
 such as `--master`, as shown above. `spark-submit` can accept any Spark 
property using the `--conf`
 flag, but uses special flags for properties that play a part in launching the 
Spark application.

http://git-wip-us.apache.org/repos/asf/spark/blob/e421072d/docs/running-on-yarn.md
--
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 2f7e498..dfe2db4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -39,7 +39,7 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
   tdcodespark.yarn.preserve.staging.files/code/td
   tdfalse/td
   td
-Set to true to preserve the staged files (Spark jar, app jar, distributed 
cache files) at the end of the job rather then delete them.
+Set to true to preserve the staged files (Spark jar, app jar, distributed 
cache files) at the end of the job rather than delete them.
   /td
 /tr
 tr
@@ -159,7 +159,7 @@ For example:
 lib/spark-examples*.jar \
 10
 
-The above starts a YARN client program which starts the default Application 
Master. Then SparkPi will be run as a child thread of Application Master. The 
client will periodically poll the Application Master for status updates and 
display them in the console. The client will exit once your application has 
finished running.  Refer to the Viewing Logs section below for how to see 
driver and executor logs.
+The above starts a YARN client program which starts the default Application 
Master. Then SparkPi will be run as a child thread of Application Master. The 
client will periodically poll the Application Master for status updates and 
display them in the console. The client will exit once your application has 
finished running.  Refer to the Debugging your Application section below for 
how to see driver and executor logs.
 
 To launch a Spark application in yarn-client mode, do the same, but replace 
yarn-cluster with yarn-client.  To run spark-shell:
 
@@ -181,7 +181,7 @@ In YARN terminology, executors and application masters run 
inside containers.
 
 yarn logs -applicationId app ID
 
-will print out the contents of all log files from all containers from the 
given application.
+will print out the contents of all log files from all containers from the 
given application. You can also view the container log files directly in HDFS 
using the HDFS shell or API. The directory where they are located can be found 
by looking at your YARN configs (`yarn.nodemanager.remote-app-log-dir` and 
`yarn.nodemanager.remote-app-log-dir-suffix`).
 
 When log aggregation isn't turned on, logs are retained locally on each 
machine under `YARN_APP_LOGS_DIR`, which is usually configured to `/tmp/logs` 
or `$HADOOP_HOME/logs/userlogs` depending on the Hadoop version and 
installation. Viewing logs for a container requires going to the host that 
contains them and looking in this 

spark git commit: Revert [SPARK-2703][Core]Make Tachyon related unit tests execute without deploying a Tachyon system locally.

2014-11-14 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 39257ca1b - 3219271f4


Revert [SPARK-2703][Core]Make Tachyon related unit tests execute without 
deploying a Tachyon system locally.

This reverts commit c127ff8c87fc4f3aa6f09697928832dc6d37cc0f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3219271f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3219271f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3219271f

Branch: refs/heads/branch-1.2
Commit: 3219271f403091d4d3af4cddd08121ba538a459b
Parents: 39257ca
Author: Patrick Wendell pwend...@gmail.com
Authored: Fri Nov 14 12:34:21 2014 -0800
Committer: Patrick Wendell pwend...@gmail.com
Committed: Fri Nov 14 12:34:21 2014 -0800

--
 core/pom.xml |  7 ---
 .../org/apache/spark/storage/BlockManagerSuite.scala | 11 ++-
 project/SparkBuild.scala |  2 --
 3 files changed, 2 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 03eb231..492eddd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -226,13 +226,6 @@
 /dependency
 dependency
   groupIdorg.tachyonproject/groupId
-  artifactIdtachyon/artifactId
-  version0.5.0/version
-  typetest-jar/type
-  scopetest/scope
-/dependency
-dependency
-  groupIdorg.tachyonproject/groupId
   artifactIdtachyon-client/artifactId
   version0.5.0/version
   exclusions

http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index c5e6ccc..5554efb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -36,7 +36,6 @@ import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
-import tachyon.master.LocalTachyonCluster
 
 import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, 
SecurityManager}
 import org.apache.spark.executor.DataReadMethod
@@ -537,14 +536,9 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test(tachyon storage) {
-val tachyonUnitTestEnabled = conf.getBoolean(spark.test.tachyon.enable, 
true)
+// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 
testing jar.
+val tachyonUnitTestEnabled = conf.getBoolean(spark.test.tachyon.enable, 
false)
 if (tachyonUnitTestEnabled) {
-  val tachyonCluster = new LocalTachyonCluster(3000)
-  tachyonCluster.start()
-  val tachyonURL = tachyon.Constants.HEADER +
-tachyonCluster.getMasterHostname() + : + 
tachyonCluster.getMasterPort()
-  conf.set(spark.tachyonStore.url, tachyonURL)
-  conf.set(spark.tachyonStore.folderName, app-test)
   store = makeBlockManager(1200)
   val a1 = new Array[Byte](400)
   val a2 = new Array[Byte](400)
@@ -555,7 +549,6 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   assert(store.getSingle(a3).isDefined, a3 was in store)
   assert(store.getSingle(a2).isDefined, a2 was in store)
   assert(store.getSingle(a1).isDefined, a1 was in store)
-  tachyonCluster.stop()
 } else {
   info(tachyon storage test disabled.)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3219271f/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index bbba642..d95d50a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -389,8 +389,6 @@ object TestSettings {
 testOptions += Tests.Argument(TestFrameworks.JUnit, -v, -a),
 // Enable Junit testing.
 libraryDependencies += com.novocode % junit-interface % 0.9 % test,
-// Enable Tachyon local testing.
-libraryDependencies += org.tachyonproject % tachyon % 0.5.0 % test 
classifier tests,
 // Only allow one test at a time, even across projects, since they run in 
the same JVM
 parallelExecution in Test := false,
 concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),


-
To unsubscribe, e-mail: 

spark git commit: [SPARK-4398][PySpark] specialize sc.parallelize(xrange)

2014-11-14 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 77e845ca7 - abd581752


[SPARK-4398][PySpark] specialize sc.parallelize(xrange)

`sc.parallelize(range(1  20), 1).count()` may take 15 seconds to finish and 
the rdd object stores the entire list, making task size very large. This PR 
adds a specialized version for xrange.

JoshRosen davies

Author: Xiangrui Meng m...@databricks.com

Closes #3264 from mengxr/SPARK-4398 and squashes the following commits:

8953c41 [Xiangrui Meng] follow davies' suggestion
cbd58e3 [Xiangrui Meng] specialize sc.parallelize(xrange)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abd58175
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abd58175
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abd58175

Branch: refs/heads/master
Commit: abd581752f9314791a688690c07ad1bb68cc09fe
Parents: 77e845c
Author: Xiangrui Meng m...@databricks.com
Authored: Fri Nov 14 12:43:17 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Nov 14 12:43:17 2014 -0800

--
 python/pyspark/context.py | 25 +
 1 file changed, 21 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abd58175/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index faa5952..b6c9914 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -289,12 +289,29 @@ class SparkContext(object):
 
 def parallelize(self, c, numSlices=None):
 
-Distribute a local Python collection to form an RDD.
+Distribute a local Python collection to form an RDD. Using xrange
+is recommended if the input represents a range for performance.
 
- sc.parallelize(range(5), 5).glom().collect()
-[[0], [1], [2], [3], [4]]
+ sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
+[[0], [2], [3], [4], [6]]
+ sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
+[[], [0], [], [2], [4]]
 
-numSlices = numSlices or self.defaultParallelism
+numSlices = int(numSlices) if numSlices is not None else 
self.defaultParallelism
+if isinstance(c, xrange):
+size = len(c)
+if size == 0:
+return self.parallelize([], numSlices)
+step = c[1] - c[0] if size  1 else 1
+start0 = c[0]
+
+def getStart(split):
+return start0 + (split * size / numSlices) * step
+
+def f(split, iterator):
+return xrange(getStart(split), getStart(split + 1), step)
+
+return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
 # Calling the Java parallelize() method with an ArrayList is too slow,
 # because it sends O(n) Py4J commands.  As an alternative, serialized
 # objects are written to a file and loaded through textFile().


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4398][PySpark] specialize sc.parallelize(xrange)

2014-11-14 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 3219271f4 - 3014803ea


[SPARK-4398][PySpark] specialize sc.parallelize(xrange)

`sc.parallelize(range(1  20), 1).count()` may take 15 seconds to finish and 
the rdd object stores the entire list, making task size very large. This PR 
adds a specialized version for xrange.

JoshRosen davies

Author: Xiangrui Meng m...@databricks.com

Closes #3264 from mengxr/SPARK-4398 and squashes the following commits:

8953c41 [Xiangrui Meng] follow davies' suggestion
cbd58e3 [Xiangrui Meng] specialize sc.parallelize(xrange)

(cherry picked from commit abd581752f9314791a688690c07ad1bb68cc09fe)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3014803e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3014803e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3014803e

Branch: refs/heads/branch-1.2
Commit: 3014803ead0aac31f36f4387c919174877525ff4
Parents: 3219271
Author: Xiangrui Meng m...@databricks.com
Authored: Fri Nov 14 12:43:17 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Nov 14 12:43:25 2014 -0800

--
 python/pyspark/context.py | 25 +
 1 file changed, 21 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3014803e/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index faa5952..b6c9914 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -289,12 +289,29 @@ class SparkContext(object):
 
 def parallelize(self, c, numSlices=None):
 
-Distribute a local Python collection to form an RDD.
+Distribute a local Python collection to form an RDD. Using xrange
+is recommended if the input represents a range for performance.
 
- sc.parallelize(range(5), 5).glom().collect()
-[[0], [1], [2], [3], [4]]
+ sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
+[[0], [2], [3], [4], [6]]
+ sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
+[[], [0], [], [2], [4]]
 
-numSlices = numSlices or self.defaultParallelism
+numSlices = int(numSlices) if numSlices is not None else 
self.defaultParallelism
+if isinstance(c, xrange):
+size = len(c)
+if size == 0:
+return self.parallelize([], numSlices)
+step = c[1] - c[0] if size  1 else 1
+start0 = c[0]
+
+def getStart(split):
+return start0 + (split * size / numSlices) * step
+
+def f(split, iterator):
+return xrange(getStart(split), getStart(split + 1), step)
+
+return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
 # Calling the Java parallelize() method with an ArrayList is too slow,
 # because it sends O(n) Py4J commands.  As an alternative, serialized
 # objects are written to a file and loaded through textFile().


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Spark Core] SPARK-4380 Edit spilling log from MB to B

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master abd581752 - 0c56a039a


[Spark Core] SPARK-4380 Edit spilling log from MB to B

https://issues.apache.org/jira/browse/SPARK-4380

Author: Hong Shen hongs...@tencent.com

Closes #3243 from shenh062326/spark_change and squashes the following commits:

4653378 [Hong Shen] Edit spilling log from MB to B
21ee960 [Hong Shen] Edit spilling log from MB to B
e9145e8 [Hong Shen] Edit spilling log from MB to B
da761c2 [Hong Shen] Edit spilling log from MB to B
946351c [Hong Shen] Edit spilling log from MB to B


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c56a039
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c56a039
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c56a039

Branch: refs/heads/master
Commit: 0c56a039a9c5b871422f0fc55ff4394bc077fb34
Parents: abd5817
Author: Hong Shen hongs...@tencent.com
Authored: Fri Nov 14 13:29:41 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:29:41 2014 -0800

--
 .../main/scala/org/apache/spark/util/collection/Spillable.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c56a039/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index d7dccd4..0e4c6d6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -105,7 +105,8 @@ private[spark] trait Spillable[C] {
*/
   @inline private def logSpillage(size: Long) {
 val threadId = Thread.currentThread().getId
-logInfo(Thread %d spilling in-memory map of %d MB to disk (%d time%s so 
far)
-.format(threadId, size / (1024 * 1024), _spillCount, if (_spillCount  
1) s else ))
+logInfo(Thread %d spilling in-memory map of %s to disk (%d time%s so far)
+.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
+_spillCount, if (_spillCount  1) s else ))
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 0c56a039a - 5c265ccde


SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR

These descriptions are from the header of spark-daemon.sh

Author: Andrew Ash and...@andrewash.com

Closes #2518 from ash211/SPARK-3663 and squashes the following commits:

058b257 [Andrew Ash] Complete hanging clause in SPARK_PID_DIR description
a17cb4b [Andrew Ash] Update docs for default locations per SPARK-4110
af89096 [Andrew Ash] SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c265ccd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c265ccd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c265ccd

Branch: refs/heads/master
Commit: 5c265ccde0c5594899ec61f9c1ea100ddff52da7
Parents: 0c56a03
Author: Andrew Ash and...@andrewash.com
Authored: Fri Nov 14 13:33:35 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:33:35 2014 -0800

--
 conf/spark-env.sh.template | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c265ccd/conf/spark-env.sh.template
--
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index f8ffbf6..0886b02 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -28,7 +28,7 @@
 # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed 
with the job.
 # - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be 
distributed with the job.
 
-# Options for the daemons used in the standalone deploy mode:
+# Options for the daemons used in the standalone deploy mode
 # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
 # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for 
the master
 # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. 
-Dx=y)
@@ -41,3 +41,10 @@
 # - SPARK_HISTORY_OPTS, to set config properties only for the history server 
(e.g. -Dx=y)
 # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. 
-Dx=y)
 # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
+
+# Generic options for the daemons used in the standalone deploy mode
+# - SPARK_CONF_DIR  Alternate conf dir. (Default: ${SPARK_HOME}/conf)
+# - SPARK_LOG_DIR   Where log files are stored.  (Default: 
${SPARK_HOME}/logs)
+# - SPARK_PID_DIR   Where the pid file is stored. (Default: /tmp)
+# - SPARK_IDENT_STRING  A string representing this instance of spark. 
(Default: $USER)
+# - SPARK_NICENESS  The scheduling priority for daemons. (Default: 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 d579b3989 - 204eaf165


SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR

These descriptions are from the header of spark-daemon.sh

Author: Andrew Ash and...@andrewash.com

Closes #2518 from ash211/SPARK-3663 and squashes the following commits:

058b257 [Andrew Ash] Complete hanging clause in SPARK_PID_DIR description
a17cb4b [Andrew Ash] Update docs for default locations per SPARK-4110
af89096 [Andrew Ash] SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR

(cherry picked from commit 5c265ccde0c5594899ec61f9c1ea100ddff52da7)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/204eaf16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/204eaf16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/204eaf16

Branch: refs/heads/branch-1.2
Commit: 204eaf1653b2bdd0befe364392baa32c31ce0d3e
Parents: d579b39
Author: Andrew Ash and...@andrewash.com
Authored: Fri Nov 14 13:33:35 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:33:43 2014 -0800

--
 conf/spark-env.sh.template | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/204eaf16/conf/spark-env.sh.template
--
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index f8ffbf6..0886b02 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -28,7 +28,7 @@
 # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed 
with the job.
 # - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be 
distributed with the job.
 
-# Options for the daemons used in the standalone deploy mode:
+# Options for the daemons used in the standalone deploy mode
 # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
 # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for 
the master
 # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. 
-Dx=y)
@@ -41,3 +41,10 @@
 # - SPARK_HISTORY_OPTS, to set config properties only for the history server 
(e.g. -Dx=y)
 # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. 
-Dx=y)
 # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
+
+# Generic options for the daemons used in the standalone deploy mode
+# - SPARK_CONF_DIR  Alternate conf dir. (Default: ${SPARK_HOME}/conf)
+# - SPARK_LOG_DIR   Where log files are stored.  (Default: 
${SPARK_HOME}/logs)
+# - SPARK_PID_DIR   Where the pid file is stored. (Default: /tmp)
+# - SPARK_IDENT_STRING  A string representing this instance of spark. 
(Default: $USER)
+# - SPARK_NICENESS  The scheduling priority for daemons. (Default: 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 5c265ccde - 156cf


[SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in 
yarn-cluster mode

In yarn-cluster mode, the Web UI is running behind a yarn proxy server. Some 
features(or bugs?) of yarn proxy server will break the links for thread dump.

1. Yarn proxy server will do http redirect internally, so if opening 
`http://example.com:8088/cluster/app/application_1415344371838_0012/executors`, 
it will fetch 
`http://example.com:8088/cluster/app/application_1415344371838_0012/executors/` 
and return the content but won't change the link in the browser. Then when a 
user clicks `Thread Dump`, it will jump to 
`http://example.com:8088/proxy/application_1415344371838_0012/threadDump/?executorId=2`.
 This is a wrong link. The correct link should be 
`http://example.com:8088/proxy/application_1415344371838_0012/executors/threadDump/?executorId=2`.

Adding / to the tab links will fix it.

2. Yarn proxy server has a bug about the URL encode/decode. When a user 
accesses 
`http://example.com:8088/proxy/application_1415344371838_0006/executors/threadDump/?executorId=%3Cdriver%3E`,
 the yarn proxy server will require 
`http://example.com:36429/executors/threadDump/?executorId=%25253Cdriver%25253E`.
 But Spark web server expects 
`http://example.com:36429/executors/threadDump/?executorId=%3Cdriver%3E`. 
Related to [YARN-2844](https://issues.apache.org/jira/browse/YARN-2844).

For now, it's a tricky approach to bypass the yarn bug.

![threaddump](https://cloud.githubusercontent.com/assets/1000778/4972567/d1ccba64-68ad-11e4-983e-257530cef35a.png)

Author: zsxwing zsxw...@gmail.com

Closes #3183 from zsxwing/SPARK-4313 and squashes the following commits:

3379ca8 [zsxwing] Encode the executor id in the thread dump link and update the 
comment
abfa063 [zsxwing] Fix link issue of the executor thread dump page in 
yarn-cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/156cf333
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/156cf333
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/156cf333

Branch: refs/heads/master
Commit: 156cfdcd93304eb5240f5a6466a3a0311957
Parents: 5c265cc
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 13:36:13 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:36:13 2014 -0800

--
 .../src/main/scala/org/apache/spark/ui/UIUtils.scala |  2 +-
 .../spark/ui/exec/ExecutorThreadDumpPage.scala   | 15 ++-
 .../org/apache/spark/ui/exec/ExecutorsPage.scala |  4 +++-
 3 files changed, 18 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/156cf333/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 3312671..7bc1e24 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -175,7 +175,7 @@ private[spark] object UIUtils extends Logging {
 val shortAppName = if (appName.length  36) appName else appName.take(32) 
+ ...
 val header = activeTab.headerTabs.map { tab =
   li class={if (tab == activeTab) active else }
-a href={prependBaseUri(activeTab.basePath, / + 
tab.prefix)}{tab.name}/a
+a href={prependBaseUri(activeTab.basePath, / + tab.prefix + 
/)}{tab.name}/a
   /li
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/156cf333/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index e9c755e..c82730f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ui.exec
 
+import java.net.URLDecoder
 import javax.servlet.http.HttpServletRequest
 
 import scala.util.Try
@@ -29,7 +30,19 @@ private[ui] class ExecutorThreadDumpPage(parent: 
ExecutorsTab) extends WebUIPage
   private val sc = parent.sc
 
   def render(request: HttpServletRequest): Seq[Node] = {
-val executorId = Option(request.getParameter(executorId)).getOrElse {
+val executorId = Option(request.getParameter(executorId)).map {
+  executorId =
+// Due to YARN-2844, driver in the url will be encoded to 
%25253Cdriver%25253E when
+// running in yarn-cluster mode. `request.getParameter(executorId)` 
will 

spark git commit: [SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in yarn-cluster mode

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 204eaf165 - 88278241e


[SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump page in 
yarn-cluster mode

In yarn-cluster mode, the Web UI is running behind a yarn proxy server. Some 
features(or bugs?) of yarn proxy server will break the links for thread dump.

1. Yarn proxy server will do http redirect internally, so if opening 
`http://example.com:8088/cluster/app/application_1415344371838_0012/executors`, 
it will fetch 
`http://example.com:8088/cluster/app/application_1415344371838_0012/executors/` 
and return the content but won't change the link in the browser. Then when a 
user clicks `Thread Dump`, it will jump to 
`http://example.com:8088/proxy/application_1415344371838_0012/threadDump/?executorId=2`.
 This is a wrong link. The correct link should be 
`http://example.com:8088/proxy/application_1415344371838_0012/executors/threadDump/?executorId=2`.

Adding / to the tab links will fix it.

2. Yarn proxy server has a bug about the URL encode/decode. When a user 
accesses 
`http://example.com:8088/proxy/application_1415344371838_0006/executors/threadDump/?executorId=%3Cdriver%3E`,
 the yarn proxy server will require 
`http://example.com:36429/executors/threadDump/?executorId=%25253Cdriver%25253E`.
 But Spark web server expects 
`http://example.com:36429/executors/threadDump/?executorId=%3Cdriver%3E`. 
Related to [YARN-2844](https://issues.apache.org/jira/browse/YARN-2844).

For now, it's a tricky approach to bypass the yarn bug.

![threaddump](https://cloud.githubusercontent.com/assets/1000778/4972567/d1ccba64-68ad-11e4-983e-257530cef35a.png)

Author: zsxwing zsxw...@gmail.com

Closes #3183 from zsxwing/SPARK-4313 and squashes the following commits:

3379ca8 [zsxwing] Encode the executor id in the thread dump link and update the 
comment
abfa063 [zsxwing] Fix link issue of the executor thread dump page in 
yarn-cluster mode

(cherry picked from commit 156cfdcd93304eb5240f5a6466a3a0311957)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88278241
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88278241
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88278241

Branch: refs/heads/branch-1.2
Commit: 88278241e9d9ca17db2f7c20d4434c32b7deff92
Parents: 204eaf1
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 13:36:13 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:36:20 2014 -0800

--
 .../src/main/scala/org/apache/spark/ui/UIUtils.scala |  2 +-
 .../spark/ui/exec/ExecutorThreadDumpPage.scala   | 15 ++-
 .../org/apache/spark/ui/exec/ExecutorsPage.scala |  4 +++-
 3 files changed, 18 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88278241/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 3312671..7bc1e24 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -175,7 +175,7 @@ private[spark] object UIUtils extends Logging {
 val shortAppName = if (appName.length  36) appName else appName.take(32) 
+ ...
 val header = activeTab.headerTabs.map { tab =
   li class={if (tab == activeTab) active else }
-a href={prependBaseUri(activeTab.basePath, / + 
tab.prefix)}{tab.name}/a
+a href={prependBaseUri(activeTab.basePath, / + tab.prefix + 
/)}{tab.name}/a
   /li
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/88278241/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index e9c755e..c82730f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ui.exec
 
+import java.net.URLDecoder
 import javax.servlet.http.HttpServletRequest
 
 import scala.util.Try
@@ -29,7 +30,19 @@ private[ui] class ExecutorThreadDumpPage(parent: 
ExecutorsTab) extends WebUIPage
   private val sc = parent.sc
 
   def render(request: HttpServletRequest): Seq[Node] = {
-val executorId = Option(request.getParameter(executorId)).getOrElse {
+val executorId = Option(request.getParameter(executorId)).map {
+  executorId =
+// Due to YARN-2844, driver in the url will be 

spark git commit: Update failed assert text to match code in SizeEstimatorSuite

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 156cf - c258db9ed


Update failed assert text to match code in SizeEstimatorSuite

Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com

Closes #3242 from hammer/patch-1 and squashes the following commits:

f88d635 [Jeff Hammerbacher] Update failed assert text to match code in 
SizeEstimatorSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c258db9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c258db9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c258db9e

Branch: refs/heads/master
Commit: c258db9ed4104b6eefe9f55f3e3959a3c46c2900
Parents: 156cf33
Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com
Authored: Fri Nov 14 13:37:48 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:37:48 2014 -0800

--
 core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c258db9e/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index f9d1af8..0ea2d13 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -118,7 +118,7 @@ class SizeEstimatorSuite
 // TODO: If we sample 100 elements, this should always be 4176 ?
 val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
 assert(estimatedSize = 4000, Estimated size  + estimatedSize +  should 
be more than 4000)
-assert(estimatedSize = 4200, Estimated size  + estimatedSize +  should 
be less than 4100)
+assert(estimatedSize = 4200, Estimated size  + estimatedSize +  should 
be less than 4200)
   }
 
   test(32-bit arch) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Update failed assert text to match code in SizeEstimatorSuite

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 88278241e - e7f957437


Update failed assert text to match code in SizeEstimatorSuite

Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com

Closes #3242 from hammer/patch-1 and squashes the following commits:

f88d635 [Jeff Hammerbacher] Update failed assert text to match code in 
SizeEstimatorSuite

(cherry picked from commit c258db9ed4104b6eefe9f55f3e3959a3c46c2900)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7f95743
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7f95743
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7f95743

Branch: refs/heads/branch-1.2
Commit: e7f957437ad013d16992a7ab12da58fa8eb6a880
Parents: 8827824
Author: Jeff Hammerbacher jeff.hammerbac...@gmail.com
Authored: Fri Nov 14 13:37:48 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 13:37:56 2014 -0800

--
 core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e7f95743/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index f9d1af8..0ea2d13 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -118,7 +118,7 @@ class SizeEstimatorSuite
 // TODO: If we sample 100 elements, this should always be 4176 ?
 val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
 assert(estimatedSize = 4000, Estimated size  + estimatedSize +  should 
be more than 4000)
-assert(estimatedSize = 4200, Estimated size  + estimatedSize +  should 
be less than 4100)
+assert(estimatedSize = 4200, Estimated size  + estimatedSize +  should 
be less than 4200)
   }
 
   test(32-bit arch) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [SPARK-4239] [SQL] support view in HiveQl

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master c258db9ed - ade72c436


http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869
--
diff --git 
a/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869
 
b/sql/hive/src/test/resources/golden/view_cast-8-2cc0c576f0a008abf5bdf3308d500869
new file mode 100644
index 000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4
--
diff --git 
a/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4
 
b/sql/hive/src/test/resources/golden/view_cast-9-f306bf3ad1c2a99f6f1843db44d7dfb4
new file mode 100644
index 000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0
--
diff --git 
a/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0
 
b/sql/hive/src/test/resources/golden/view_inputs-0-9e67dfd1d595ab8b1935b789645f76c0
new file mode 100644
index 000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe
--
diff --git 
a/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe
 
b/sql/hive/src/test/resources/golden/view_inputs-1-5af97e73bc3841793440105aae766bbe
new file mode 100644
index 000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ade72c43/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db
--
diff --git 
a/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db
 
b/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db
new file mode 100644
index 000..1f3d8a7
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/view_inputs-2-626fa3664754125edc44b7ca7f8630db
@@ -0,0 +1 @@
+1028


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateArray expression.

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 51b053a31 - 4bdeeb7d2


[SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateArray 
expression.

The `containsNull` of the result `ArrayType` of `CreateArray` should be `true` 
only if the children is empty or there exists nullable child.

Author: Takuya UESHIN ues...@happy-camper.st

Closes #3110 from ueshin/issues/SPARK-4245 and squashes the following commits:

6f64746 [Takuya UESHIN] Move equalsIgnoreNullability method into DataType.
5a90e02 [Takuya UESHIN] Refine InsertIntoHiveType and add some comments.
cbecba8 [Takuya UESHIN] Fix a test title.
884ec37 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-4245
3c5274b [Takuya UESHIN] Add tests to insert data of types ArrayType / MapType / 
StructType with nullability is false into Hive table.
41a94a9 [Takuya UESHIN] Replace InsertIntoTable with InsertIntoHiveTable if 
data types ignoring nullability are same.
43e6ef5 [Takuya UESHIN] Fix containsNull for empty array.
778e997 [Takuya UESHIN] Fix containsNull of the result ArrayType of CreateArray 
expression.

(cherry picked from commit bbd8f5bee81d5788c356977c173dd1edc42c77a3)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bdeeb7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bdeeb7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bdeeb7d

Branch: refs/heads/branch-1.2
Commit: 4bdeeb7d25453b9b50c7dc23a5c7f588754f0e52
Parents: 51b053a
Author: Takuya UESHIN ues...@happy-camper.st
Authored: Fri Nov 14 14:21:16 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:21:32 2014 -0800

--
 .../sql/catalyst/expressions/complexTypes.scala |  4 +-
 .../spark/sql/catalyst/types/dataTypes.scala| 21 
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 27 +++
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 ++-
 .../sql/hive/InsertIntoHiveTableSuite.scala | 50 
 5 files changed, 106 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4bdeeb7d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 19421e5..917b346 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -115,7 +115,9 @@ case class CreateArray(children: Seq[Expression]) extends 
Expression {
 
   override def dataType: DataType = {
 assert(resolved, sInvalid dataType of mixed ArrayType 
${childTypes.mkString(,)})
-ArrayType(childTypes.headOption.getOrElse(NullType))
+ArrayType(
+  childTypes.headOption.getOrElse(NullType),
+  containsNull = children.exists(_.nullable))
   }
 
   override def nullable: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/4bdeeb7d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 5dd19dd..ff1dc03 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -171,6 +171,27 @@ object DataType {
   case _ =
 }
   }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, 
StructType.
+   */
+  def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+(left, right) match {
+  case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =
+equalsIgnoreNullability(leftElementType, rightElementType)
+  case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, 
rightValueType, _)) =
+equalsIgnoreNullability(leftKeyType, rightKeyType) 
+equalsIgnoreNullability(leftValueType, rightValueType)
+  case (StructType(leftFields), StructType(rightFields)) =
+leftFields.size == rightFields.size 
+leftFields.zip(rightFields)
+  .forall{
+case (left, right) =
+  left.name == right.name  
equalsIgnoreNullability(left.dataType, right.dataType)
+  }
+  case (left, right) = left == right
+}
+  }
 }
 
 abstract class DataType {


spark git commit: SPARK-4375. no longer require -Pscala-2.10

2014-11-14 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master bbd8f5bee - f5f757e4e


SPARK-4375. no longer require -Pscala-2.10

It seems like the winds might have moved away from this approach, but wanted to 
post the PR anyway because I got it working and to show what it would look like.

Author: Sandy Ryza sa...@cloudera.com

Closes #3239 from sryza/sandy-spark-4375 and squashes the following commits:

0ffbe95 [Sandy Ryza] Enable -Dscala-2.11 in sbt
cd42d94 [Sandy Ryza] Update doc
f6644c3 [Sandy Ryza] SPARK-4375 take 2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5f757e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5f757e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5f757e4

Branch: refs/heads/master
Commit: f5f757e4ed80759dc5668c63d5663651689f8da8
Parents: bbd8f5b
Author: Sandy Ryza sa...@cloudera.com
Authored: Fri Nov 14 14:21:57 2014 -0800
Committer: Patrick Wendell pwend...@gmail.com
Committed: Fri Nov 14 14:21:57 2014 -0800

--
 docs/building-spark.md   |   4 +-
 examples/pom.xml |  65 ++-
 pom.xml  |  22 +---
 project/SparkBuild.scala |   3 ++
 repl/pom.xml | 117 +++---
 sql/catalyst/pom.xml |  11 +---
 sql/hive/pom.xml |   3 --
 7 files changed, 54 insertions(+), 171 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5f757e4/docs/building-spark.md
--
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 20ba7da..bb18414 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -113,9 +113,9 @@ mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 
-Dhadoop.version=2.4.0
 {% endhighlight %}
 
 # Building for Scala 2.11
-To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` 
profile:
+To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` 
property:
 
-mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package
+mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package
 
 Scala 2.11 support in Spark is experimental and does not support a few 
features.
 Specifically, Spark's external Kafka library and JDBC component are not yet

http://git-wip-us.apache.org/repos/asf/spark/blob/f5f757e4/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 2ec5728..2752ce3 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -389,11 +389,11 @@
   /properties
 /profile
 profile
-  !-- We add source directories specific to Scala 2.10 and 2.11 since 
some examples
-   work only in one and not the other --
+  !-- We add a source directory specific to Scala 2.10 since Kafka and 
Algebird
+   only work with it --
   idscala-2.10/id
   activation
-activeByDefaulttrue/activeByDefault
+propertyname!scala-2.11/name/property
   /activation
   dependencies
 dependency
@@ -427,65 +427,6 @@
   /sources
 /configuration
   /execution
-  execution
-idadd-scala-test-sources/id
-phasegenerate-test-sources/phase
-goals
-  goaladd-test-source/goal
-/goals
-configuration
-  sources
-sourcesrc/test/scala/source
-sourcescala-2.10/src/test/scala/source
-sourcescala-2.10/src/test/java/source
-  /sources
-/configuration
-  /execution
-/executions
-  /plugin
-/plugins
-  /build
-/profile
-profile
-  idscala-2.11/id
-  activation
-activeByDefaultfalse/activeByDefault
-  /activation
-  dependencies
-!-- Streaming Kafka and zeromq modules are disabled for now. --
-  /dependencies
-  build
-plugins
-  plugin
-groupIdorg.codehaus.mojo/groupId
-artifactIdbuild-helper-maven-plugin/artifactId
-executions
-  execution
-idadd-scala-sources/id
-phasegenerate-sources/phase
-goals
-  goaladd-source/goal
-/goals
-configuration
-  sources
-sourcesrc/main/scala/source
-sourcescala-2.11/src/main/scala/source
-  /sources
-/configuration
-  /execution
-  execution
-idadd-scala-test-sources/id
-phasegenerate-test-sources/phase
-goals

spark git commit: SPARK-4375. no longer require -Pscala-2.10

2014-11-14 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 4bdeeb7d2 - d90ddf12b


SPARK-4375. no longer require -Pscala-2.10

It seems like the winds might have moved away from this approach, but wanted to 
post the PR anyway because I got it working and to show what it would look like.

Author: Sandy Ryza sa...@cloudera.com

Closes #3239 from sryza/sandy-spark-4375 and squashes the following commits:

0ffbe95 [Sandy Ryza] Enable -Dscala-2.11 in sbt
cd42d94 [Sandy Ryza] Update doc
f6644c3 [Sandy Ryza] SPARK-4375 take 2

(cherry picked from commit f5f757e4ed80759dc5668c63d5663651689f8da8)
Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90ddf12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90ddf12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90ddf12

Branch: refs/heads/branch-1.2
Commit: d90ddf12b6bea2162e982e800c96d2c94f66b347
Parents: 4bdeeb7
Author: Sandy Ryza sa...@cloudera.com
Authored: Fri Nov 14 14:21:57 2014 -0800
Committer: Patrick Wendell pwend...@gmail.com
Committed: Fri Nov 14 14:22:13 2014 -0800

--
 docs/building-spark.md   |   4 +-
 examples/pom.xml |  65 ++-
 pom.xml  |  22 +---
 project/SparkBuild.scala |   3 ++
 repl/pom.xml | 117 +++---
 sql/catalyst/pom.xml |  11 +---
 sql/hive/pom.xml |   3 --
 7 files changed, 54 insertions(+), 171 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d90ddf12/docs/building-spark.md
--
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 20ba7da..bb18414 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -113,9 +113,9 @@ mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 
-Dhadoop.version=2.4.0
 {% endhighlight %}
 
 # Building for Scala 2.11
-To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` 
profile:
+To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` 
property:
 
-mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package
+mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package
 
 Scala 2.11 support in Spark is experimental and does not support a few 
features.
 Specifically, Spark's external Kafka library and JDBC component are not yet

http://git-wip-us.apache.org/repos/asf/spark/blob/d90ddf12/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 2ec5728..2752ce3 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -389,11 +389,11 @@
   /properties
 /profile
 profile
-  !-- We add source directories specific to Scala 2.10 and 2.11 since 
some examples
-   work only in one and not the other --
+  !-- We add a source directory specific to Scala 2.10 since Kafka and 
Algebird
+   only work with it --
   idscala-2.10/id
   activation
-activeByDefaulttrue/activeByDefault
+propertyname!scala-2.11/name/property
   /activation
   dependencies
 dependency
@@ -427,65 +427,6 @@
   /sources
 /configuration
   /execution
-  execution
-idadd-scala-test-sources/id
-phasegenerate-test-sources/phase
-goals
-  goaladd-test-source/goal
-/goals
-configuration
-  sources
-sourcesrc/test/scala/source
-sourcescala-2.10/src/test/scala/source
-sourcescala-2.10/src/test/java/source
-  /sources
-/configuration
-  /execution
-/executions
-  /plugin
-/plugins
-  /build
-/profile
-profile
-  idscala-2.11/id
-  activation
-activeByDefaultfalse/activeByDefault
-  /activation
-  dependencies
-!-- Streaming Kafka and zeromq modules are disabled for now. --
-  /dependencies
-  build
-plugins
-  plugin
-groupIdorg.codehaus.mojo/groupId
-artifactIdbuild-helper-maven-plugin/artifactId
-executions
-  execution
-idadd-scala-sources/id
-phasegenerate-sources/phase
-goals
-  goaladd-source/goal
-/goals
-configuration
-  sources
-sourcesrc/main/scala/source
-sourcescala-2.11/src/main/scala/source
-  /sources
-/configuration
-  /execution
-  

spark git commit: [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master f5f757e4e - 0cbdb01e1


[SPARK-4333][SQL]  Correctly log number of iterations in RuleExecutor

When iterator of RuleExecutor breaks, the num of iterator should be (iteration 
- 1) not (iteration ).Because log looks like Fixed point reached for batch 
${batch.name} after 3 iterations., but it did 2 iterations really!

Author: DoingDone9 799203...@qq.com

Closes #3180 from DoingDone9/issue_01 and squashes the following commits:

571e2ed [DoingDone9] Update RuleExecutor.scala
46514b6 [DoingDone9] When iterator of RuleExecutor breaks, the num of iterator 
should be iteration - 1 not iteration.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cbdb01e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cbdb01e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cbdb01e

Branch: refs/heads/master
Commit: 0cbdb01e1c817e71c4f80de05c4e5bb11510b368
Parents: f5f757e
Author: DoingDone9 799203...@qq.com
Authored: Fri Nov 14 14:28:06 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:28:06 2014 -0800

--
 .../scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0cbdb01e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index d192b15..c441f0b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -79,7 +79,8 @@ abstract class RuleExecutor[TreeType : TreeNode[_]] extends 
Logging {
 }
 
 if (curPlan.fastEquals(lastPlan)) {
-  logTrace(sFixed point reached for batch ${batch.name} after 
$iteration iterations.)
+  logTrace(
+sFixed point reached for batch ${batch.name} after ${iteration - 
1} iterations.)
   continue = false
 }
 lastPlan = curPlan


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 d90ddf12b - f8810b6a5


[SPARK-4333][SQL]  Correctly log number of iterations in RuleExecutor

When iterator of RuleExecutor breaks, the num of iterator should be (iteration 
- 1) not (iteration ).Because log looks like Fixed point reached for batch 
${batch.name} after 3 iterations., but it did 2 iterations really!

Author: DoingDone9 799203...@qq.com

Closes #3180 from DoingDone9/issue_01 and squashes the following commits:

571e2ed [DoingDone9] Update RuleExecutor.scala
46514b6 [DoingDone9] When iterator of RuleExecutor breaks, the num of iterator 
should be iteration - 1 not iteration.

(cherry picked from commit 0cbdb01e1c817e71c4f80de05c4e5bb11510b368)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8810b6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8810b6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8810b6a

Branch: refs/heads/branch-1.2
Commit: f8810b6a572f314ab0b88899172d8fa2b78e014f
Parents: d90ddf1
Author: DoingDone9 799203...@qq.com
Authored: Fri Nov 14 14:28:06 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:28:19 2014 -0800

--
 .../scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f8810b6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index d192b15..c441f0b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -79,7 +79,8 @@ abstract class RuleExecutor[TreeType : TreeNode[_]] extends 
Logging {
 }
 
 if (curPlan.fastEquals(lastPlan)) {
-  logTrace(sFixed point reached for batch ${batch.name} after 
$iteration iterations.)
+  logTrace(
+sFixed point reached for batch ${batch.name} after ${iteration - 
1} iterations.)
   continue = false
 }
 lastPlan = curPlan


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector

2014-11-14 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f8810b6a5 - 5b63158ac


[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka 
connector

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in 
Spark Streaming is enabled. Details and design doc can be seen in 
[SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

Author: jerryshao saisai.s...@intel.com
Author: Tathagata Das tathagata.das1...@gmail.com
Author: Saisai Shao saisai.s...@intel.com

Closes #2991 from jerryshao/kafka-refactor and squashes the following commits:

5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate 
KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted 
to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver

(cherry picked from commit 5930f64bf0d2516304b21bd49eac361a54caabdd)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b63158a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b63158a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b63158a

Branch: refs/heads/branch-1.2
Commit: 5b63158ac2100627ae4a77f3a89ae038e5b6be90
Parents: f8810b6
Author: jerryshao saisai.s...@intel.com
Authored: Fri Nov 14 14:33:37 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Fri Nov 14 14:33:56 2014 -0800

--
 .../streaming/kafka/KafkaInputDStream.scala |  33 ++-
 .../spark/streaming/kafka/KafkaUtils.scala  |   4 +-
 .../streaming/kafka/ReliableKafkaReceiver.scala | 282 +++
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  44 +--
 .../streaming/kafka/KafkaStreamSuite.scala  | 216 +++---
 .../kafka/ReliableKafkaStreamSuite.scala| 140 +
 project/MimaExcludes.scala  |   4 +
 .../streaming/receiver/BlockGenerator.scala |  55 +++-
 .../receiver/ReceiverSupervisorImpl.scala   |   8 +-
 .../apache/spark/streaming/ReceiverSuite.scala  |   8 +-
 10 files changed, 651 insertions(+), 143 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b63158a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 28ac592..4d26b64 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.streaming.kafka
 
+import java.util.Properties
+
 import scala.collection.Map
 import scala.reflect.{classTag, ClassTag}
 
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
+import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, 
ConsumerConnector}
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
 
@@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 /**
  * Input stream that pulls messages from a Kafka Broker.
@@ -51,12 +51,16 @@ class KafkaInputDStream[
 @transient ssc_ : StreamingContext,
 kafkaParams: Map[String, String],
 topics: Map[String, Int],
+useReliableReceiver: Boolean,
 storageLevel: StorageLevel
   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
 
   def getReceiver(): Receiver[(K, V)] 

spark git commit: [SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector

2014-11-14 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 0cbdb01e1 - 5930f64bf


[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka 
connector

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in 
Spark Streaming is enabled. Details and design doc can be seen in 
[SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

Author: jerryshao saisai.s...@intel.com
Author: Tathagata Das tathagata.das1...@gmail.com
Author: Saisai Shao saisai.s...@intel.com

Closes #2991 from jerryshao/kafka-refactor and squashes the following commits:

5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate 
KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted 
to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5930f64b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5930f64b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5930f64b

Branch: refs/heads/master
Commit: 5930f64bf0d2516304b21bd49eac361a54caabdd
Parents: 0cbdb01
Author: jerryshao saisai.s...@intel.com
Authored: Fri Nov 14 14:33:37 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Fri Nov 14 14:33:37 2014 -0800

--
 .../streaming/kafka/KafkaInputDStream.scala |  33 ++-
 .../spark/streaming/kafka/KafkaUtils.scala  |   4 +-
 .../streaming/kafka/ReliableKafkaReceiver.scala | 282 +++
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  44 +--
 .../streaming/kafka/KafkaStreamSuite.scala  | 216 +++---
 .../kafka/ReliableKafkaStreamSuite.scala| 140 +
 project/MimaExcludes.scala  |   4 +
 .../streaming/receiver/BlockGenerator.scala |  55 +++-
 .../receiver/ReceiverSupervisorImpl.scala   |   8 +-
 .../apache/spark/streaming/ReceiverSuite.scala  |   8 +-
 10 files changed, 651 insertions(+), 143 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5930f64b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 28ac592..4d26b64 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.streaming.kafka
 
+import java.util.Properties
+
 import scala.collection.Map
 import scala.reflect.{classTag, ClassTag}
 
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
+import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, 
ConsumerConnector}
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
 
@@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 /**
  * Input stream that pulls messages from a Kafka Broker.
@@ -51,12 +51,16 @@ class KafkaInputDStream[
 @transient ssc_ : StreamingContext,
 kafkaParams: Map[String, String],
 topics: Map[String, Int],
+useReliableReceiver: Boolean,
 storageLevel: StorageLevel
   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
 
   def getReceiver(): Receiver[(K, V)] = {
-new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-.asInstanceOf[Receiver[(K, V)]]
+if 

spark git commit: [SPARK-4390][SQL] Handle NaN cast to decimal correctly

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 5b63158ac - 0dd924178


[SPARK-4390][SQL] Handle NaN cast to decimal correctly

Author: Michael Armbrust mich...@databricks.com

Closes #3256 from marmbrus/NanDecimal and squashes the following commits:

4c3ba46 [Michael Armbrust] fix style
d360f83 [Michael Armbrust] Handle NaN cast to decimal

(cherry picked from commit a0300ea32a9d92bd51c72930bc3979087b0082b2)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dd92417
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dd92417
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dd92417

Branch: refs/heads/branch-1.2
Commit: 0dd9241783b01815b68059067c72f36b8d05dddf
Parents: 5b63158
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 14:56:57 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:57:09 2014 -0800

--
 .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 6 +-
 .../golden/NaN to Decimal-0-6ca781bc343025635d72321ef0a9d425   | 1 +
 .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala   | 3 +++
 3 files changed, 9 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 55319e7..34697a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -290,7 +290,11 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
 case LongType =
   b = changePrecision(Decimal(b.asInstanceOf[Long]), target)
 case x: NumericType =  // All other numeric types can be represented 
precisely as Doubles
-  b = 
changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), 
target)
+  b = try {
+
changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), 
target)
+  } catch {
+case _: NumberFormatException = null
+  }
   }
 
   // DoubleConverter

http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/hive/src/test/resources/golden/NaN
 to Decimal-0-6ca781bc343025635d72321ef0a9d425
--
diff --git a/sql/hive/src/test/resources/golden/NaN to 
Decimal-0-6ca781bc343025635d72321ef0a9d425 
b/sql/hive/src/test/resources/golden/NaN to 
Decimal-0-6ca781bc343025635d72321ef0a9d425
new file mode 100644
index 000..7951def
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/NaN to 
Decimal-0-6ca781bc343025635d72321ef0a9d425  
@@ -0,0 +1 @@
+NULL

http://git-wip-us.apache.org/repos/asf/spark/blob/0dd92417/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 684d228..0dd766f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -56,6 +56,9 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 Locale.setDefault(originalLocale)
   }
 
+  createQueryTest(NaN to Decimal,
+SELECT CAST(CAST('NaN' AS DOUBLE) AS DECIMAL(1,1)) FROM src LIMIT 1)
+
   createQueryTest(constant null testing,
 SELECT
   |IF(FALSE, CAST(NULL AS STRING), CAST(1 AS STRING)) AS COL1,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4391][SQL] Configure parquet filters using SQLConf

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master a0300ea32 - e47c38763


[SPARK-4391][SQL] Configure parquet filters using SQLConf

This is more uniform with the rest of SQL configuration and allows it to be 
turned on and off without restarting the SparkContext.  In this PR I also turn 
off filter pushdown by default due to a number of outstanding issues (in 
particular SPARK-4258).  When those are fixed we should turn it back on by 
default.

Author: Michael Armbrust mich...@databricks.com

Closes #3258 from marmbrus/parquetFilters and squashes the following commits:

5655bfe [Michael Armbrust] Remove extra line.
15e9a98 [Michael Armbrust] Enable filters for tests
75afd39 [Michael Armbrust] Fix comments
78fa02d [Michael Armbrust] off by default
e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter 
pushdown


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47c3876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47c3876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47c3876

Branch: refs/heads/master
Commit: e47c38763914aaf89a7a851c5f41b7549a75615b
Parents: a0300ea
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 14:59:35 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:59:35 2014 -0800

--
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala  |  8 +++-
 .../apache/spark/sql/execution/SparkStrategies.scala   |  7 +--
 .../org/apache/spark/sql/parquet/ParquetFilters.scala  |  2 --
 .../spark/sql/parquet/ParquetTableOperations.scala | 13 +++--
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala   |  2 ++
 5 files changed, 21 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495a..cd7d78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
 
 import java.util.Properties
 
-
 private[spark] object SQLConf {
   val COMPRESS_CACHED = spark.sql.inMemoryColumnarStorage.compressed
   val COLUMN_BATCH_SIZE = spark.sql.inMemoryColumnarStorage.batchSize
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = spark.sql.shuffle.partitions
   val CODEGEN_ENABLED = spark.sql.codegen
   val DIALECT = spark.sql.dialect
+
   val PARQUET_BINARY_AS_STRING = spark.sql.parquet.binaryAsString
   val PARQUET_CACHE_METADATA = spark.sql.parquet.cacheMetadata
   val PARQUET_COMPRESSION = spark.sql.parquet.compression.codec
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.parquet.filterPushdown
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = spark.sql.columnNameOfCorruptRecord
 
   // This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 
200).toInt
 
+  /** When true predicates will be passed to the parquet record reader when 
possible. */
+  private[spark] def parquetFilterPushDown =
+getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, false).toBoolean
+
   /**
* When set to true, Spark SQL will use the Scala compiler at runtime to 
generate custom bytecode
* that evaluates expressions found in queries.  In general this custom code 
runs much faster

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c0..03cd5bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
   case PhysicalOperation(projectList, filters: Seq[Expression], relation: 
ParquetRelation) =
 val prunePushedDownFilters =
-  if 
(sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, 
true)) {
+  if (sqlContext.parquetFilterPushDown) {
 (filters: Seq[Expression]) = {
   filters.filter { filter =
 

spark git commit: [SPARK-4391][SQL] Configure parquet filters using SQLConf

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 0dd924178 - 576688aa2


[SPARK-4391][SQL] Configure parquet filters using SQLConf

This is more uniform with the rest of SQL configuration and allows it to be 
turned on and off without restarting the SparkContext.  In this PR I also turn 
off filter pushdown by default due to a number of outstanding issues (in 
particular SPARK-4258).  When those are fixed we should turn it back on by 
default.

Author: Michael Armbrust mich...@databricks.com

Closes #3258 from marmbrus/parquetFilters and squashes the following commits:

5655bfe [Michael Armbrust] Remove extra line.
15e9a98 [Michael Armbrust] Enable filters for tests
75afd39 [Michael Armbrust] Fix comments
78fa02d [Michael Armbrust] off by default
e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter 
pushdown

(cherry picked from commit e47c38763914aaf89a7a851c5f41b7549a75615b)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576688aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576688aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576688aa

Branch: refs/heads/branch-1.2
Commit: 576688aa2a19bd4ba239a2b93af7947f983e5124
Parents: 0dd9241
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 14:59:35 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 14:59:45 2014 -0800

--
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala  |  8 +++-
 .../apache/spark/sql/execution/SparkStrategies.scala   |  7 +--
 .../org/apache/spark/sql/parquet/ParquetFilters.scala  |  2 --
 .../spark/sql/parquet/ParquetTableOperations.scala | 13 +++--
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala   |  2 ++
 5 files changed, 21 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/576688aa/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495a..cd7d78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
 
 import java.util.Properties
 
-
 private[spark] object SQLConf {
   val COMPRESS_CACHED = spark.sql.inMemoryColumnarStorage.compressed
   val COLUMN_BATCH_SIZE = spark.sql.inMemoryColumnarStorage.batchSize
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = spark.sql.shuffle.partitions
   val CODEGEN_ENABLED = spark.sql.codegen
   val DIALECT = spark.sql.dialect
+
   val PARQUET_BINARY_AS_STRING = spark.sql.parquet.binaryAsString
   val PARQUET_CACHE_METADATA = spark.sql.parquet.cacheMetadata
   val PARQUET_COMPRESSION = spark.sql.parquet.compression.codec
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.parquet.filterPushdown
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = spark.sql.columnNameOfCorruptRecord
 
   // This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 
200).toInt
 
+  /** When true predicates will be passed to the parquet record reader when 
possible. */
+  private[spark] def parquetFilterPushDown =
+getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, false).toBoolean
+
   /**
* When set to true, Spark SQL will use the Scala compiler at runtime to 
generate custom bytecode
* that evaluates expressions found in queries.  In general this custom code 
runs much faster

http://git-wip-us.apache.org/repos/asf/spark/blob/576688aa/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c0..03cd5bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
   case PhysicalOperation(projectList, filters: Seq[Expression], relation: 
ParquetRelation) =
 val prunePushedDownFilters =
-  if 
(sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, 
true)) {
+   

spark git commit: [SQL] Minor cleanup of comments, errors and override.

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master e47c38763 - f805025e8


[SQL] Minor cleanup of comments, errors and override.

Author: Michael Armbrust mich...@databricks.com

Closes #3257 from marmbrus/minorCleanup and squashes the following commits:

d8b5abc [Michael Armbrust] Use interpolation.
2fdf903 [Michael Armbrust] Better error message when coalesce can't be resolved.
f9fa6cf [Michael Armbrust] Methods in a final class do not also need to be 
final, use override.
199fd98 [Michael Armbrust] Fix typo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f805025e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f805025e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f805025e

Branch: refs/heads/master
Commit: f805025e8efe9cd522e8875141ec27df8d16bbe0
Parents: e47c387
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 15:00:42 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:00:42 2014 -0800

--
 .../spark/sql/catalyst/expressions/aggregates.scala |  2 +-
 .../expressions/codegen/GenerateProjection.scala| 16 
 .../sql/catalyst/expressions/nullFunctions.scala|  4 +++-
 3 files changed, 12 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f805025e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 2b364fc..3ceb5ec 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -304,7 +304,7 @@ case class Average(child: Expression) extends 
PartialAggregate with trees.UnaryN
 
 child.dataType match {
   case DecimalType.Fixed(_, _) =
-// Turn the results to unlimited decimals for the divsion, before 
going back to fixed
+// Turn the results to unlimited decimals for the division, before 
going back to fixed
 val castedSum = Cast(Sum(partialSum.toAttribute), 
DecimalType.Unlimited)
 val castedCount = Cast(Sum(partialCount.toAttribute), 
DecimalType.Unlimited)
 SplitEvaluation(

http://git-wip-us.apache.org/repos/asf/spark/blob/f805025e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 7871a62..2ff6116 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -53,8 +53,8 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 val nullFunctions =
   q
 private[this] var nullBits = new Array[Boolean](${expressions.size})
-final def setNullAt(i: Int) = { nullBits(i) = true }
-final def isNullAt(i: Int) = nullBits(i)
+override def setNullAt(i: Int) = { nullBits(i) = true }
+override def isNullAt(i: Int) = nullBits(i)
   .children
 
 val tupleElements = expressions.zipWithIndex.flatMap {
@@ -82,7 +82,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 val iLit = ru.Literal(Constant(i))
 qif(isNullAt($iLit)) { null } else { ${newTermName(sc$i)} }
   }
-  qfinal def iterator = Iterator[Any](..$allColumns)
+  qoverride def iterator = Iterator[Any](..$allColumns)
 }
 
 val accessorFailure = qscala.sys.error(Invalid ordinal: + i)
@@ -94,7 +94,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 
 qif(i == $ordinal) { if(isNullAt($i)) return null else return 
$elementName }
   }
-  qfinal def apply(i: Int): Any = { ..$cases; $accessorFailure }
+  qoverride def apply(i: Int): Any = { ..$cases; $accessorFailure }
 }
 
 val updateFunction = {
@@ -114,7 +114,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 return
   }
   }
-  qfinal def update(i: Int, value: Any): Unit = { ..$cases; 
$accessorFailure }
+  qoverride def update(i: Int, value: Any): Unit = { ..$cases; 
$accessorFailure }
 }
 
 val 

spark git commit: [SQL] Minor cleanup of comments, errors and override.

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 576688aa2 - e35672e7e


[SQL] Minor cleanup of comments, errors and override.

Author: Michael Armbrust mich...@databricks.com

Closes #3257 from marmbrus/minorCleanup and squashes the following commits:

d8b5abc [Michael Armbrust] Use interpolation.
2fdf903 [Michael Armbrust] Better error message when coalesce can't be resolved.
f9fa6cf [Michael Armbrust] Methods in a final class do not also need to be 
final, use override.
199fd98 [Michael Armbrust] Fix typo

(cherry picked from commit f805025e8efe9cd522e8875141ec27df8d16bbe0)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e35672e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e35672e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e35672e7

Branch: refs/heads/branch-1.2
Commit: e35672e7edeb7f68bece12d3d656419d3e610e95
Parents: 576688a
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 15:00:42 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:00:51 2014 -0800

--
 .../spark/sql/catalyst/expressions/aggregates.scala |  2 +-
 .../expressions/codegen/GenerateProjection.scala| 16 
 .../sql/catalyst/expressions/nullFunctions.scala|  4 +++-
 3 files changed, 12 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e35672e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 2b364fc..3ceb5ec 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -304,7 +304,7 @@ case class Average(child: Expression) extends 
PartialAggregate with trees.UnaryN
 
 child.dataType match {
   case DecimalType.Fixed(_, _) =
-// Turn the results to unlimited decimals for the divsion, before 
going back to fixed
+// Turn the results to unlimited decimals for the division, before 
going back to fixed
 val castedSum = Cast(Sum(partialSum.toAttribute), 
DecimalType.Unlimited)
 val castedCount = Cast(Sum(partialCount.toAttribute), 
DecimalType.Unlimited)
 SplitEvaluation(

http://git-wip-us.apache.org/repos/asf/spark/blob/e35672e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 7871a62..2ff6116 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -53,8 +53,8 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 val nullFunctions =
   q
 private[this] var nullBits = new Array[Boolean](${expressions.size})
-final def setNullAt(i: Int) = { nullBits(i) = true }
-final def isNullAt(i: Int) = nullBits(i)
+override def setNullAt(i: Int) = { nullBits(i) = true }
+override def isNullAt(i: Int) = nullBits(i)
   .children
 
 val tupleElements = expressions.zipWithIndex.flatMap {
@@ -82,7 +82,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 val iLit = ru.Literal(Constant(i))
 qif(isNullAt($iLit)) { null } else { ${newTermName(sc$i)} }
   }
-  qfinal def iterator = Iterator[Any](..$allColumns)
+  qoverride def iterator = Iterator[Any](..$allColumns)
 }
 
 val accessorFailure = qscala.sys.error(Invalid ordinal: + i)
@@ -94,7 +94,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 
 qif(i == $ordinal) { if(isNullAt($i)) return null else return 
$elementName }
   }
-  qfinal def apply(i: Int): Any = { ..$cases; $accessorFailure }
+  qoverride def apply(i: Int): Any = { ..$cases; $accessorFailure }
 }
 
 val updateFunction = {
@@ -114,7 +114,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 return
   }
   }
-  qfinal def update(i: Int, value: Any): Unit = { ..$cases; 

spark git commit: [SQL] Don't shuffle code generated rows

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 e35672e7e - 680bc0619


[SQL] Don't shuffle code generated rows

When sort based shuffle and code gen are on we were trying to ship the code 
generated rows during a shuffle.  This doesn't work because the classes don't 
exist on the other side.  Instead we now copy into a generic row before 
shipping.

Author: Michael Armbrust mich...@databricks.com

Closes #3263 from marmbrus/aggCodeGen and squashes the following commits:

f6ba8cf [Michael Armbrust] fix and test

(cherry picked from commit 4b4b50c9e596673c1534df97effad50d107a8007)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/680bc061
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/680bc061
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/680bc061

Branch: refs/heads/branch-1.2
Commit: 680bc06195ecdc6ff2390c55adeb637649f2c8f3
Parents: e35672e
Author: Michael Armbrust mich...@databricks.com
Authored: Fri Nov 14 15:03:23 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:03:45 2014 -0800

--
 .../main/scala/org/apache/spark/sql/execution/Exchange.scala  | 4 ++--
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala   | 7 +++
 2 files changed, 9 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 927f400..cff7a01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
 // TODO: Eliminate redundant expressions in grouping key and value.
 val rdd = if (sortBasedShuffleOn) {
   child.execute().mapPartitions { iter =
-val hashExpressions = newProjection(expressions, child.output)
-iter.map(r = (hashExpressions(r), r.copy()))
+val hashExpressions = newMutableProjection(expressions, 
child.output)()
+iter.map(r = (hashExpressions(r).copy(), r.copy()))
   }
 } else {
   child.execute().mapPartitions { iter =

http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8a80724..5dd777f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll 
{
   2.5)
   }
 
+  test(aggregation with codegen) {
+val originalValue = codegenEnabled
+setConf(SQLConf.CODEGEN_ENABLED, true)
+sql(SELECT key FROM testData GROUP BY key).collect()
+setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+  }
+
   test(SPARK-3176 Added Parser of SQL LAST()) {
 checkAnswer(
   sql(SELECT LAST(n) FROM lowerCaseData),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 4b4b50c9e - 0c7b66bd4


[SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields

While resolving struct fields, the resulted `GetField` expression is wrapped 
with an `Alias` to make it a named expression. Assume `a` is a struct instance 
with a field `b`, then `a.b` will be resolved as `Alias(GetField(a, b), 
b)`. Thus, for this following SQL query:

```sql
SELECT a.b + 1 FROM t GROUP BY a.b + 1
```

the grouping expression is

```scala
Add(GetField(a, b), Literal(1, IntegerType))
```

while the aggregation expression is

```scala
Add(Alias(GetField(a, b), b), Literal(1, IntegerType))
```

This mismatch makes the above SQL query fail during the both analysis and 
execution phases. This PR fixes this issue by removing the alias when 
substituting aggregation expressions.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/3248)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #3248 from liancheng/spark-4322 and squashes the following commits:

23a46ea [Cheng Lian] Code simplification
dd20a79 [Cheng Lian] Should only trim aliases around `GetField`s
7f46532 [Cheng Lian] Enables struct fields as sub expressions of grouping fields


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c7b66bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c7b66bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c7b66bd

Branch: refs/heads/master
Commit: 0c7b66bd449093bb5d2dafaf91d54e63e601e320
Parents: 4b4b50c
Author: Cheng Lian l...@databricks.com
Authored: Fri Nov 14 15:09:36 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:09:36 2014 -0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 27 +---
 .../spark/sql/catalyst/planning/patterns.scala  | 15 ---
 .../org/apache/spark/sql/SQLQuerySuite.scala| 12 -
 3 files changed, 34 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c7b66bd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a448c79..d3b4cf8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -60,7 +60,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, 
caseSensitive: Bool
   ResolveFunctions ::
   GlobalAggregates ::
   UnresolvedHavingClauseAttributes ::
-  TrimAliases ::
+  TrimGroupingAliases ::
   typeCoercionRules ++
   extendedRules : _*),
 Batch(Check Analysis, Once,
@@ -93,17 +93,10 @@ class Analyzer(catalog: Catalog, registry: 
FunctionRegistry, caseSensitive: Bool
   /**
* Removes no-op Alias expressions from the plan.
*/
-  object TrimAliases extends Rule[LogicalPlan] {
+  object TrimGroupingAliases extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan transform {
   case Aggregate(groups, aggs, child) =
-Aggregate(
-  groups.map {
-_ transform {
-  case Alias(c, _) = c
-}
-  },
-  aggs,
-  child)
+Aggregate(groups.map(_.transform { case Alias(c, _) = c }), aggs, 
child)
 }
   }
 
@@ -122,10 +115,15 @@ class Analyzer(catalog: Catalog, registry: 
FunctionRegistry, caseSensitive: Bool
 case e = e.children.forall(isValidAggregateExpression)
   }
 
-  aggregateExprs.foreach { e =
-if (!isValidAggregateExpression(e)) {
-  throw new TreeNodeException(plan, sExpression not in GROUP BY: 
$e)
-}
+  aggregateExprs.find { e =
+!isValidAggregateExpression(e.transform {
+  // Should trim aliases around `GetField`s. These aliases are 
introduced while
+  // resolving struct field accesses, because `GetField` is not a 
`NamedExpression`.
+  // (Should we just turn `GetField` into a `NamedExpression`?)
+  case Alias(g: GetField, _) = g
+})
+  }.foreach { e =
+throw new TreeNodeException(plan, sExpression not in GROUP BY: 
$e)
   }
 
   aggregatePlan
@@ -328,4 +326,3 @@ object EliminateAnalysisOperators extends Rule[LogicalPlan] 
{
 case Subquery(_, child) = child
   }
 }
-


spark git commit: [SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 680bc0619 - 1cac30083


[SPARK-4322][SQL] Enables struct fields as sub expressions of grouping fields

While resolving struct fields, the resulted `GetField` expression is wrapped 
with an `Alias` to make it a named expression. Assume `a` is a struct instance 
with a field `b`, then `a.b` will be resolved as `Alias(GetField(a, b), 
b)`. Thus, for this following SQL query:

```sql
SELECT a.b + 1 FROM t GROUP BY a.b + 1
```

the grouping expression is

```scala
Add(GetField(a, b), Literal(1, IntegerType))
```

while the aggregation expression is

```scala
Add(Alias(GetField(a, b), b), Literal(1, IntegerType))
```

This mismatch makes the above SQL query fail during the both analysis and 
execution phases. This PR fixes this issue by removing the alias when 
substituting aggregation expressions.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/3248)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #3248 from liancheng/spark-4322 and squashes the following commits:

23a46ea [Cheng Lian] Code simplification
dd20a79 [Cheng Lian] Should only trim aliases around `GetField`s
7f46532 [Cheng Lian] Enables struct fields as sub expressions of grouping fields

(cherry picked from commit 0c7b66bd449093bb5d2dafaf91d54e63e601e320)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cac3008
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cac3008
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cac3008

Branch: refs/heads/branch-1.2
Commit: 1cac30083b97c98c3663e2d2cd057124f033eb34
Parents: 680bc06
Author: Cheng Lian l...@databricks.com
Authored: Fri Nov 14 15:09:36 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:09:55 2014 -0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 27 +---
 .../spark/sql/catalyst/planning/patterns.scala  | 15 ---
 .../org/apache/spark/sql/SQLQuerySuite.scala| 12 -
 3 files changed, 34 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1cac3008/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a448c79..d3b4cf8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -60,7 +60,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, 
caseSensitive: Bool
   ResolveFunctions ::
   GlobalAggregates ::
   UnresolvedHavingClauseAttributes ::
-  TrimAliases ::
+  TrimGroupingAliases ::
   typeCoercionRules ++
   extendedRules : _*),
 Batch(Check Analysis, Once,
@@ -93,17 +93,10 @@ class Analyzer(catalog: Catalog, registry: 
FunctionRegistry, caseSensitive: Bool
   /**
* Removes no-op Alias expressions from the plan.
*/
-  object TrimAliases extends Rule[LogicalPlan] {
+  object TrimGroupingAliases extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan transform {
   case Aggregate(groups, aggs, child) =
-Aggregate(
-  groups.map {
-_ transform {
-  case Alias(c, _) = c
-}
-  },
-  aggs,
-  child)
+Aggregate(groups.map(_.transform { case Alias(c, _) = c }), aggs, 
child)
 }
   }
 
@@ -122,10 +115,15 @@ class Analyzer(catalog: Catalog, registry: 
FunctionRegistry, caseSensitive: Bool
 case e = e.children.forall(isValidAggregateExpression)
   }
 
-  aggregateExprs.foreach { e =
-if (!isValidAggregateExpression(e)) {
-  throw new TreeNodeException(plan, sExpression not in GROUP BY: 
$e)
-}
+  aggregateExprs.find { e =
+!isValidAggregateExpression(e.transform {
+  // Should trim aliases around `GetField`s. These aliases are 
introduced while
+  // resolving struct field accesses, because `GetField` is not a 
`NamedExpression`.
+  // (Should we just turn `GetField` into a `NamedExpression`?)
+  case Alias(g: GetField, _) = g
+})
+  }.foreach { e =
+throw new TreeNodeException(plan, sExpression not in GROUP BY: 
$e)
   }
 
   aggregatePlan
@@ -328,4 +326,3 @@ object 

spark git commit: [SPARK-4386] Improve performance when writing Parquet files.

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 0c7b66bd4 - f76b96837


[SPARK-4386] Improve performance when writing Parquet files.

If you profile the writing of a Parquet file, the single worst time consuming 
call inside of org.apache.spark.sql.parquet.MutableRowWriteSupport.write is 
actually in the scala.collection.AbstractSequence.size call. This is because 
the size call actually ends up COUNTING the elements in a 
scala.collection.LinearSeqOptimized.length (optimized?).

This doesn't need to be done. size is called repeatedly where needed rather 
than called once at the top of the method and stored in a 'val'.

Author: Jim Carroll j...@dontcallme.com

Closes #3254 from jimfcarroll/parquet-perf and squashes the following commits:

30cc0b5 [Jim Carroll] Improve performance when writing Parquet files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f76b9683
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f76b9683
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f76b9683

Branch: refs/heads/master
Commit: f76b9683706232c3d4e8e6e61627b8188dcb79dc
Parents: 0c7b66b
Author: Jim Carroll j...@dontcallme.com
Authored: Fri Nov 14 15:11:53 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:11:53 2014 -0800

--
 .../spark/sql/parquet/ParquetTableSupport.scala   | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f76b9683/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 7bc2496..ef3687e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -152,14 +152,15 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
   }
 
   override def write(record: Row): Unit = {
-if (attributes.size  record.size) {
+val attributesSize = attributes.size
+if (attributesSize  record.size) {
   throw new IndexOutOfBoundsException(
-sTrying to write more fields than contained in row 
(${attributes.size}${record.size}))
+sTrying to write more fields than contained in row 
(${attributesSize}${record.size}))
 }
 
 var index = 0
 writer.startMessage()
-while(index  attributes.size) {
+while(index  attributesSize) {
   // null values indicate optional fields but we do not check currently
   if (record(index) != null) {
 writer.startField(attributes(index).name, index)
@@ -312,14 +313,15 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
 // Optimized for non-nested rows
 private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
   override def write(record: Row): Unit = {
-if (attributes.size  record.size) {
+val attributesSize = attributes.size
+if (attributesSize  record.size) {
   throw new IndexOutOfBoundsException(
-sTrying to write more fields than contained in row 
(${attributes.size}${record.size}))
+sTrying to write more fields than contained in row 
(${attributesSize}${record.size}))
 }
 
 var index = 0
 writer.startMessage()
-while(index  attributes.size) {
+while(index  attributesSize) {
   // null values indicate optional fields but we do not check currently
   if (record(index) != null  record(index) != Nil) {
 writer.startField(attributes(index).name, index)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4386] Improve performance when writing Parquet files.

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 1cac30083 - 7f242dc29


[SPARK-4386] Improve performance when writing Parquet files.

If you profile the writing of a Parquet file, the single worst time consuming 
call inside of org.apache.spark.sql.parquet.MutableRowWriteSupport.write is 
actually in the scala.collection.AbstractSequence.size call. This is because 
the size call actually ends up COUNTING the elements in a 
scala.collection.LinearSeqOptimized.length (optimized?).

This doesn't need to be done. size is called repeatedly where needed rather 
than called once at the top of the method and stored in a 'val'.

Author: Jim Carroll j...@dontcallme.com

Closes #3254 from jimfcarroll/parquet-perf and squashes the following commits:

30cc0b5 [Jim Carroll] Improve performance when writing Parquet files.

(cherry picked from commit f76b9683706232c3d4e8e6e61627b8188dcb79dc)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f242dc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f242dc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f242dc2

Branch: refs/heads/branch-1.2
Commit: 7f242dc2911bbc821e90fed81421af9b8d6dcd9a
Parents: 1cac300
Author: Jim Carroll j...@dontcallme.com
Authored: Fri Nov 14 15:11:53 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:12:07 2014 -0800

--
 .../spark/sql/parquet/ParquetTableSupport.scala   | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f242dc2/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 7bc2496..ef3687e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -152,14 +152,15 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
   }
 
   override def write(record: Row): Unit = {
-if (attributes.size  record.size) {
+val attributesSize = attributes.size
+if (attributesSize  record.size) {
   throw new IndexOutOfBoundsException(
-sTrying to write more fields than contained in row 
(${attributes.size}${record.size}))
+sTrying to write more fields than contained in row 
(${attributesSize}${record.size}))
 }
 
 var index = 0
 writer.startMessage()
-while(index  attributes.size) {
+while(index  attributesSize) {
   // null values indicate optional fields but we do not check currently
   if (record(index) != null) {
 writer.startField(attributes(index).name, index)
@@ -312,14 +313,15 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
 // Optimized for non-nested rows
 private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
   override def write(record: Row): Unit = {
-if (attributes.size  record.size) {
+val attributesSize = attributes.size
+if (attributesSize  record.size) {
   throw new IndexOutOfBoundsException(
-sTrying to write more fields than contained in row 
(${attributes.size}${record.size}))
+sTrying to write more fields than contained in row 
(${attributesSize}${record.size}))
 }
 
 var index = 0
 writer.startMessage()
-while(index  attributes.size) {
+while(index  attributesSize) {
   // null values indicate optional fields but we do not check currently
   if (record(index) != null  record(index) != Nil) {
 writer.startField(attributes(index).name, index)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master f76b96837 - 63ca3af66


[SPARK-4365][SQL] Remove unnecessary filter call on records returned from 
parquet library

Since parquet library has been updated , we no longer need to filter the 
records returned from parquet library for null records , as now the library 
skips those :

from 
parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java

public boolean nextKeyValue() throws IOException, InterruptedException {
boolean recordFound = false;
while (!recordFound) {
// no more records left
if (current = total)
{ return false; }
try {
checkRead();
currentValue = recordReader.read();
current ++;
if (recordReader.shouldSkipCurrentRecord())
{
 // this record is being filtered via the filter2 package
if (DEBUG) LOG.debug(skipping record);
 continue;
 }
if (currentValue == null)
{
// only happens with FilteredRecordReader at end of block current = 
totalCountLoadedSoFar;
 if (DEBUG) LOG.debug(filtered record reader reached end of block);
 continue;
}

recordFound = true;
if (DEBUG) LOG.debug(read value:  + currentValue);
} catch (RuntimeException e)
{ throw new ParquetDecodingException(format(Can not read value at %d in block 
%d in file %s, current, currentBlock, file), e); }

}
return true;
}

Author: Yash Datta yash.da...@guavus.com

Closes #3229 from saucam/remove_filter and squashes the following commits:

8909ae9 [Yash Datta] SPARK-4365: Remove unnecessary filter call on records 
returned from parquet library


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63ca3af6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63ca3af6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63ca3af6

Branch: refs/heads/master
Commit: 63ca3af66f9680fd12adee82fb4d342caae5cea4
Parents: f76b968
Author: Yash Datta yash.da...@guavus.com
Authored: Fri Nov 14 15:16:36 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:16:40 2014 -0800

--
 .../org/apache/spark/sql/parquet/ParquetTableOperations.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/63ca3af6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5f93279..f6bed50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -159,7 +159,7 @@ case class ParquetTableScan(
   }
 } else {
   baseRDD.map(_._2)
-}.filter(_ != null) // Parquet's record filters may produce null values
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4365][SQL] Remove unnecessary filter call on records returned from parquet library

2014-11-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 7f242dc29 - aa5d8e57c


[SPARK-4365][SQL] Remove unnecessary filter call on records returned from 
parquet library

Since parquet library has been updated , we no longer need to filter the 
records returned from parquet library for null records , as now the library 
skips those :

from 
parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java

public boolean nextKeyValue() throws IOException, InterruptedException {
boolean recordFound = false;
while (!recordFound) {
// no more records left
if (current = total)
{ return false; }
try {
checkRead();
currentValue = recordReader.read();
current ++;
if (recordReader.shouldSkipCurrentRecord())
{
 // this record is being filtered via the filter2 package
if (DEBUG) LOG.debug(skipping record);
 continue;
 }
if (currentValue == null)
{
// only happens with FilteredRecordReader at end of block current = 
totalCountLoadedSoFar;
 if (DEBUG) LOG.debug(filtered record reader reached end of block);
 continue;
}

recordFound = true;
if (DEBUG) LOG.debug(read value:  + currentValue);
} catch (RuntimeException e)
{ throw new ParquetDecodingException(format(Can not read value at %d in block 
%d in file %s, current, currentBlock, file), e); }

}
return true;
}

Author: Yash Datta yash.da...@guavus.com

Closes #3229 from saucam/remove_filter and squashes the following commits:

8909ae9 [Yash Datta] SPARK-4365: Remove unnecessary filter call on records 
returned from parquet library

(cherry picked from commit 63ca3af66f9680fd12adee82fb4d342caae5cea4)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa5d8e57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa5d8e57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa5d8e57

Branch: refs/heads/branch-1.2
Commit: aa5d8e57c63d045b291a5c1fc99e782a0f191854
Parents: 7f242dc
Author: Yash Datta yash.da...@guavus.com
Authored: Fri Nov 14 15:16:36 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Nov 14 15:17:12 2014 -0800

--
 .../org/apache/spark/sql/parquet/ParquetTableOperations.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa5d8e57/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5f93279..f6bed50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -159,7 +159,7 @@ case class ParquetTableScan(
   }
 } else {
   baseRDD.map(_._2)
-}.filter(_ != null) // Parquet's record filters may produce null values
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-4214. With dynamic allocation, avoid outstanding requests for more...

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 ef39ec419 - c425e31ad


SPARK-4214. With dynamic allocation, avoid outstanding requests for more...

... executors than pending tasks need.

WIP. Still need to add and fix tests.

Author: Sandy Ryza sa...@cloudera.com

Closes #3204 from sryza/sandy-spark-4214 and squashes the following commits:

35cf0e0 [Sandy Ryza] Add comment
13b53df [Sandy Ryza] Review feedback
067465f [Sandy Ryza] Whitespace fix
6ae080c [Sandy Ryza] Add tests and get num pending tasks from 
ExecutorAllocationListener
531e2b6 [Sandy Ryza] SPARK-4214. With dynamic allocation, avoid outstanding 
requests for more executors than pending tasks need.

(cherry picked from commit ad42b283246b93654c5fd731cd618fee74d8c4da)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c425e31a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c425e31a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c425e31a

Branch: refs/heads/branch-1.2
Commit: c425e31ad0132ddb0a817b26fe1e5d11a7ef7a63
Parents: ef39ec4
Author: Sandy Ryza sa...@cloudera.com
Authored: Fri Nov 14 15:51:05 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 15:51:50 2014 -0800

--
 .../spark/ExecutorAllocationManager.scala   | 55 
 .../spark/ExecutorAllocationManagerSuite.scala  | 48 +
 2 files changed, 94 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c425e31a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index ef93009..88adb89 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
  * the scheduler queue is not drained in N seconds, then new executors are 
added. If the queue
  * persists for another M seconds, then more executors are added and so on. 
The number added
  * in each round increases exponentially from the previous round until an 
upper bound on the
- * number of executors has been reached.
+ * number of executors has been reached. The upper bound is based both on a 
configured property
+ * and on the number of tasks pending: the policy will never increase the 
number of executor
+ * requests past the number needed to handle all pending tasks.
  *
  * The rationale for the exponential increase is twofold: (1) Executors should 
be added slowly
  * in the beginning in case the number of extra executors needed turns out to 
be small. Otherwise,
@@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
   // During testing, the methods to actually kill and add executors are mocked 
out
   private val testing = conf.getBoolean(spark.dynamicAllocation.testing, 
false)
 
+  // TODO: The default value of 1 for spark.executor.cores works right now 
because dynamic
+  // allocation is only supported for YARN and the default number of cores per 
executor in YARN is
+  // 1, but it might need to be attained differently for different cluster 
managers
+  private val tasksPerExecutor =
+conf.getInt(spark.executor.cores, 1) / conf.getInt(spark.task.cpus, 1)
+
   validateSettings()
 
   // Number of executors to add in the next round
@@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
   // Clock used to schedule when executors should be added and removed
   private var clock: Clock = new RealClock
 
+  // Listener for Spark events that impact the allocation policy
+  private val listener = new ExecutorAllocationListener(this)
+
   /**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
   throw new SparkException(Dynamic allocation of executors requires the 
external  +
 shuffle service. You may enable this through 
spark.shuffle.service.enabled.)
 }
+if (tasksPerExecutor == 0) {
+  throw new SparkException(spark.executor.cores must not be less than 
spark.task.cpus.cores)
+}
   }
 
   /**
@@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove 
executors.
*/
   def start(): Unit = {
-val listener = new ExecutorAllocationListener(this)
 

spark git commit: [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc...

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master ad42b2832 - 303a4e4d2


[SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit 
sub-proc...

...ess ends

https://issues.apache.org/jira/browse/SPARK-4404

When we have spark.driver.extra* or spark.driver.memory in 
SPARK_SUBMIT_PROPERTIES_FILE, spark-class will use 
SparkSubmitDriverBootstrapper to launch driver.
If we get process id of SparkSubmitDriverBootstrapper and wanna kill it during 
its running, we expect its SparkSubmit sub-process stop also.

Author: WangTao barneystin...@aliyun.com
Author: WangTaoTheTonic barneystin...@aliyun.com

Closes #3266 from WangTaoTheTonic/killsubmit and squashes the following commits:

e03eba5 [WangTaoTheTonic] add comments
57b5ca1 [WangTao] SparkSubmitDriverBootstrapper should stop after its 
SparkSubmit sub-process ends


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/303a4e4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/303a4e4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/303a4e4d

Branch: refs/heads/master
Commit: 303a4e4d23e5cd93b541480cf88d5badb9cf9622
Parents: ad42b28
Author: WangTao barneystin...@aliyun.com
Authored: Fri Nov 14 20:11:51 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 20:11:51 2014 -0800

--
 .../spark/deploy/SparkSubmitDriverBootstrapper.scala  | 10 ++
 1 file changed, 10 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/303a4e4d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 2b894a7..729 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -129,6 +129,16 @@ private[spark] object SparkSubmitDriverBootstrapper {
 
 val process = builder.start()
 
+// If we kill an app while it's running, its sub-process should be killed 
too.
+Runtime.getRuntime().addShutdownHook(new Thread() {
+  override def run() = {
+if (process != null) {
+  process.destroy()
+  sys.exit(process.waitFor())
+}
+  }
+})
+
 // Redirect stdout and stderr from the child JVM
 val stdoutThread = new RedirectThread(process.getInputStream, System.out, 
redirect stdout)
 val stderrThread = new RedirectThread(process.getErrorStream, System.err, 
redirect stderr)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit sub-proc...

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 c425e31ad - 118c89c28


[SPARK-4404]SparkSubmitDriverBootstrapper should stop after its SparkSubmit 
sub-proc...

...ess ends

https://issues.apache.org/jira/browse/SPARK-4404

When we have spark.driver.extra* or spark.driver.memory in 
SPARK_SUBMIT_PROPERTIES_FILE, spark-class will use 
SparkSubmitDriverBootstrapper to launch driver.
If we get process id of SparkSubmitDriverBootstrapper and wanna kill it during 
its running, we expect its SparkSubmit sub-process stop also.

Author: WangTao barneystin...@aliyun.com
Author: WangTaoTheTonic barneystin...@aliyun.com

Closes #3266 from WangTaoTheTonic/killsubmit and squashes the following commits:

e03eba5 [WangTaoTheTonic] add comments
57b5ca1 [WangTao] SparkSubmitDriverBootstrapper should stop after its 
SparkSubmit sub-process ends

(cherry picked from commit 303a4e4d23e5cd93b541480cf88d5badb9cf9622)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/118c89c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/118c89c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/118c89c2

Branch: refs/heads/branch-1.2
Commit: 118c89c28d1c3c048a5bd0335db4a0c65d71a4aa
Parents: c425e31
Author: WangTao barneystin...@aliyun.com
Authored: Fri Nov 14 20:11:51 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 20:12:05 2014 -0800

--
 .../spark/deploy/SparkSubmitDriverBootstrapper.scala  | 10 ++
 1 file changed, 10 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/118c89c2/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 2b894a7..729 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -129,6 +129,16 @@ private[spark] object SparkSubmitDriverBootstrapper {
 
 val process = builder.start()
 
+// If we kill an app while it's running, its sub-process should be killed 
too.
+Runtime.getRuntime().addShutdownHook(new Thread() {
+  override def run() = {
+if (process != null) {
+  process.destroy()
+  sys.exit(process.waitFor())
+}
+  }
+})
+
 // Redirect stdout and stderr from the child JVM
 val stdoutThread = new RedirectThread(process.getInputStream, System.out, 
redirect stdout)
 val stderrThread = new RedirectThread(process.getErrorStream, System.err, 
redirect stderr)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4415] [PySpark] JVM should exit after Python exit

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 118c89c28 - 306e68cf0


[SPARK-4415] [PySpark] JVM should exit after Python exit

When JVM is started in a Python process, it should exit once the stdin is 
closed.

test: add spark.driver.memory in conf/spark-defaults.conf

```
daviesdm:~/work/spark$ cat conf/spark-defaults.conf
spark.driver.memory   8g
daviesdm:~/work/spark$ bin/pyspark
 quit
daviesdm:~/work/spark$ jps
4931 Jps
286
daviesdm:~/work/spark$ python wc.py
943738
0.719928026199
daviesdm:~/work/spark$ jps
286
4990 Jps
```

Author: Davies Liu dav...@databricks.com

Closes #3274 from davies/exit and squashes the following commits:

df0e524 [Davies Liu] address comments
ce8599c [Davies Liu] address comments
050651f [Davies Liu] JVM should exit after Python exit

(cherry picked from commit 7fe08b43c78bf9e8515f671e72aa03a83ea782f8)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306e68cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306e68cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306e68cf

Branch: refs/heads/branch-1.2
Commit: 306e68cf00e6ec6b10f1a29eb7434f3f3ea27752
Parents: 118c89c
Author: Davies Liu dav...@databricks.com
Authored: Fri Nov 14 20:13:46 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 20:14:40 2014 -0800

--
 bin/pyspark  |  2 --
 bin/pyspark2.cmd |  1 -
 .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++-
 python/pyspark/java_gateway.py   |  4 +++-
 4 files changed, 9 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/bin/pyspark
--
diff --git a/bin/pyspark b/bin/pyspark
index 1d8c94d..0b4f695 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -132,7 +132,5 @@ if [[ $1 =~ \.py$ ]]; then
   gatherSparkSubmitOpts $@
   exec $FWDIR/bin/spark-submit ${SUBMISSION_OPTS[@]} $primary 
${APPLICATION_OPTS[@]}
 else
-  # PySpark shell requires special handling downstream
-  export PYSPARK_SHELL=1
   exec $PYSPARK_DRIVER_PYTHON $PYSPARK_DRIVER_PYTHON_OPTS
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/bin/pyspark2.cmd
--
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 59415e9..a542ec8 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R \.py') do (
 )
 
 if [%PYTHON_FILE%] == [] (
-  set PYSPARK_SHELL=1
   if [%IPYTHON%] == [1] (
ipython %IPYTHON_OPTS%
   ) else (

http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 729..aa3743c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper {
 // subprocess there already reads directly from our stdin, so we should 
avoid spawning a
 // thread that contends with the subprocess in reading from System.in.
 val isWindows = Utils.isWindows
-val isPySparkShell = sys.env.contains(PYSPARK_SHELL)
+val isSubprocess = sys.env.contains(IS_SUBPROCESS)
 if (!isWindows) {
   val stdinThread = new RedirectThread(System.in, process.getOutputStream, 
redirect stdin)
   stdinThread.start()
-  // For the PySpark shell, Spark submit itself runs as a python 
subprocess, and so this JVM
-  // should terminate on broken pipe, which signals that the parent 
process has exited. In
-  // Windows, the termination logic for the PySpark shell is handled in 
java_gateway.py
-  if (isPySparkShell) {
+  // Spark submit (JVM) may run as a subprocess, and so this JVM should 
terminate on
+  // broken pipe, signaling that the parent process has exited. This is 
the case if the
+  // application is launched directly from python, as in the PySpark 
shell. In Windows,
+  // the termination logic is handled in java_gateway.py
+  if (isSubprocess) {
 stdinThread.join()
 process.destroy()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/306e68cf/python/pyspark/java_gateway.py
--
diff --git 

spark git commit: [SPARK-4415] [PySpark] JVM should exit after Python exit

2014-11-14 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 303a4e4d2 - 7fe08b43c


[SPARK-4415] [PySpark] JVM should exit after Python exit

When JVM is started in a Python process, it should exit once the stdin is 
closed.

test: add spark.driver.memory in conf/spark-defaults.conf

```
daviesdm:~/work/spark$ cat conf/spark-defaults.conf
spark.driver.memory   8g
daviesdm:~/work/spark$ bin/pyspark
 quit
daviesdm:~/work/spark$ jps
4931 Jps
286
daviesdm:~/work/spark$ python wc.py
943738
0.719928026199
daviesdm:~/work/spark$ jps
286
4990 Jps
```

Author: Davies Liu dav...@databricks.com

Closes #3274 from davies/exit and squashes the following commits:

df0e524 [Davies Liu] address comments
ce8599c [Davies Liu] address comments
050651f [Davies Liu] JVM should exit after Python exit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fe08b43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fe08b43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fe08b43

Branch: refs/heads/master
Commit: 7fe08b43c78bf9e8515f671e72aa03a83ea782f8
Parents: 303a4e4
Author: Davies Liu dav...@databricks.com
Authored: Fri Nov 14 20:13:46 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Fri Nov 14 20:14:33 2014 -0800

--
 bin/pyspark  |  2 --
 bin/pyspark2.cmd |  1 -
 .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++-
 python/pyspark/java_gateway.py   |  4 +++-
 4 files changed, 9 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark
--
diff --git a/bin/pyspark b/bin/pyspark
index 1d8c94d..0b4f695 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -132,7 +132,5 @@ if [[ $1 =~ \.py$ ]]; then
   gatherSparkSubmitOpts $@
   exec $FWDIR/bin/spark-submit ${SUBMISSION_OPTS[@]} $primary 
${APPLICATION_OPTS[@]}
 else
-  # PySpark shell requires special handling downstream
-  export PYSPARK_SHELL=1
   exec $PYSPARK_DRIVER_PYTHON $PYSPARK_DRIVER_PYTHON_OPTS
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark2.cmd
--
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 59415e9..a542ec8 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R \.py') do (
 )
 
 if [%PYTHON_FILE%] == [] (
-  set PYSPARK_SHELL=1
   if [%IPYTHON%] == [1] (
ipython %IPYTHON_OPTS%
   ) else (

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 729..aa3743c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper {
 // subprocess there already reads directly from our stdin, so we should 
avoid spawning a
 // thread that contends with the subprocess in reading from System.in.
 val isWindows = Utils.isWindows
-val isPySparkShell = sys.env.contains(PYSPARK_SHELL)
+val isSubprocess = sys.env.contains(IS_SUBPROCESS)
 if (!isWindows) {
   val stdinThread = new RedirectThread(System.in, process.getOutputStream, 
redirect stdin)
   stdinThread.start()
-  // For the PySpark shell, Spark submit itself runs as a python 
subprocess, and so this JVM
-  // should terminate on broken pipe, which signals that the parent 
process has exited. In
-  // Windows, the termination logic for the PySpark shell is handled in 
java_gateway.py
-  if (isPySparkShell) {
+  // Spark submit (JVM) may run as a subprocess, and so this JVM should 
terminate on
+  // broken pipe, signaling that the parent process has exited. This is 
the case if the
+  // application is launched directly from python, as in the PySpark 
shell. In Windows,
+  // the termination logic is handled in java_gateway.py
+  if (isSubprocess) {
 stdinThread.join()
 process.destroy()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 9c70fa5..a975dc1 100644
--- 

spark git commit: [SPARK-4379][Core] Change Exception to SparkException in checkpoint

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 7fe08b43c - dba140582


[SPARK-4379][Core] Change Exception to SparkException in checkpoint

It's better to change to SparkException. However, it's a breaking change since 
it will change the exception type.

Author: zsxwing zsxw...@gmail.com

Closes #3241 from zsxwing/SPARK-4379 and squashes the following commits:

409f3af [zsxwing] Change Exception to SparkException in checkpoint


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba14058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba14058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba14058

Branch: refs/heads/master
Commit: dba14058230194122a715c219e35ab8eaa786321
Parents: 7fe08b4
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 22:25:41 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 22:25:41 2014 -0800

--
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dba14058/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 716f2dd..cb64d43 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1202,7 +1202,7 @@ abstract class RDD[T: ClassTag](
*/
   def checkpoint() {
 if (context.checkpointDir.isEmpty) {
-  throw new Exception(Checkpoint directory has not been set in the 
SparkContext)
+  throw new SparkException(Checkpoint directory has not been set in the 
SparkContext)
 } else if (checkpointData.isEmpty) {
   checkpointData = Some(new RDDCheckpointData(this))
   checkpointData.get.markForCheckpoint()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4379][Core] Change Exception to SparkException in checkpoint

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 306e68cf0 - e27fa40ed


[SPARK-4379][Core] Change Exception to SparkException in checkpoint

It's better to change to SparkException. However, it's a breaking change since 
it will change the exception type.

Author: zsxwing zsxw...@gmail.com

Closes #3241 from zsxwing/SPARK-4379 and squashes the following commits:

409f3af [zsxwing] Change Exception to SparkException in checkpoint

(cherry picked from commit dba14058230194122a715c219e35ab8eaa786321)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e27fa40e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e27fa40e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e27fa40e

Branch: refs/heads/branch-1.2
Commit: e27fa40ed16c1b1d04911e0bdd803a4d43eb9a10
Parents: 306e68c
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 22:25:41 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 22:25:49 2014 -0800

--
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e27fa40e/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 716f2dd..cb64d43 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1202,7 +1202,7 @@ abstract class RDD[T: ClassTag](
*/
   def checkpoint() {
 if (context.checkpointDir.isEmpty) {
-  throw new Exception(Checkpoint directory has not been set in the 
SparkContext)
+  throw new SparkException(Checkpoint directory has not been set in the 
SparkContext)
 } else if (checkpointData.isEmpty) {
   checkpointData = Some(new RDDCheckpointData(this))
   checkpointData.get.markForCheckpoint()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4363][Doc] Update the Broadcast example

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 e27fa40ed - 29a6da372


[SPARK-4363][Doc] Update the Broadcast example

Author: zsxwing zsxw...@gmail.com

Closes #3226 from zsxwing/SPARK-4363 and squashes the following commits:

8109914 [zsxwing] Update the Broadcast example

(cherry picked from commit 861223ee5bea8e434a9ebb0d53f436ce23809f9c)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29a6da37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29a6da37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29a6da37

Branch: refs/heads/branch-1.2
Commit: 29a6da37257d8a165967392af6f452a404e445cd
Parents: e27fa40
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 22:28:48 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 22:28:58 2014 -0800

--
 core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 2 +-
 docs/programming-guide.md  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/29a6da37/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
--
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 87f5cf9..a5ea478 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -39,7 +39,7 @@ import scala.reflect.ClassTag
  *
  * {{{
  * scala val broadcastVar = sc.broadcast(Array(1, 2, 3))
- * broadcastVar: spark.Broadcast[Array[Int]] = 
spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
+ * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = 
Broadcast(0)
  *
  * scala broadcastVar.value
  * res0: Array[Int] = Array(1, 2, 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/29a6da37/docs/programming-guide.md
--
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 18420af..9de2f91 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1131,7 +1131,7 @@ method. The code below shows this:
 
 {% highlight scala %}
 scala val broadcastVar = sc.broadcast(Array(1, 2, 3))
-broadcastVar: spark.Broadcast[Array[Int]] = 
spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
+broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
 
 scala broadcastVar.value
 res0: Array[Int] = Array(1, 2, 3)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4363][Doc] Update the Broadcast example

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master dba140582 - 861223ee5


[SPARK-4363][Doc] Update the Broadcast example

Author: zsxwing zsxw...@gmail.com

Closes #3226 from zsxwing/SPARK-4363 and squashes the following commits:

8109914 [zsxwing] Update the Broadcast example


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/861223ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/861223ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/861223ee

Branch: refs/heads/master
Commit: 861223ee5bea8e434a9ebb0d53f436ce23809f9c
Parents: dba1405
Author: zsxwing zsxw...@gmail.com
Authored: Fri Nov 14 22:28:48 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 22:28:48 2014 -0800

--
 core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 2 +-
 docs/programming-guide.md  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/861223ee/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
--
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 87f5cf9..a5ea478 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -39,7 +39,7 @@ import scala.reflect.ClassTag
  *
  * {{{
  * scala val broadcastVar = sc.broadcast(Array(1, 2, 3))
- * broadcastVar: spark.Broadcast[Array[Int]] = 
spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
+ * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = 
Broadcast(0)
  *
  * scala broadcastVar.value
  * res0: Array[Int] = Array(1, 2, 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/861223ee/docs/programming-guide.md
--
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 18420af..9de2f91 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1131,7 +1131,7 @@ method. The code below shows this:
 
 {% highlight scala %}
 scala val broadcastVar = sc.broadcast(Array(1, 2, 3))
-broadcastVar: spark.Broadcast[Array[Int]] = 
spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
+broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
 
 scala broadcastVar.value
 res0: Array[Int] = Array(1, 2, 3)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4260] Httpbroadcast should set connection timeout.

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 861223ee5 - 60969b033


[SPARK-4260] Httpbroadcast should set connection timeout.

Httpbroadcast sets read timeout but doesn't set connection timeout.

Author: Kousuke Saruta saru...@oss.nttdata.co.jp

Closes #3122 from sarutak/httpbroadcast-timeout and squashes the following 
commits:

c7f3a56 [Kousuke Saruta] Added Connection timeout for Http Connection to 
HttpBroadcast.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60969b03
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60969b03
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60969b03

Branch: refs/heads/master
Commit: 60969b0336930449a826821a48f83f65337e8856
Parents: 861223e
Author: Kousuke Saruta saru...@oss.nttdata.co.jp
Authored: Fri Nov 14 22:36:56 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 22:36:56 2014 -0800

--
 core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60969b03/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
--
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 7dade04..31f0a46 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -191,10 +191,12 @@ private[broadcast] object HttpBroadcast extends Logging {
   logDebug(broadcast security enabled)
   val newuri = Utils.constructURIForAuthentication(new URI(url), 
securityManager)
   uc = newuri.toURL.openConnection()
+  uc.setConnectTimeout(httpReadTimeout)
   uc.setAllowUserInteraction(false)
 } else {
   logDebug(broadcast not using security)
   uc = new URL(url).openConnection()
+  uc.setConnectTimeout(httpReadTimeout)
 }
 
 val in = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Added contains(key) to Metadata

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 60969b033 - cbddac236


Added contains(key) to Metadata

Add contains(key) to org.apache.spark.sql.catalyst.util.Metadata to test the 
existence of a key. Otherwise, Class Metadata's get methods may throw 
NoSuchElement exception if the key does not exist.
Testcases are added to MetadataSuite as well.

Author: kai kaiz...@eecs.berkeley.edu

Closes #3273 from kai-zeng/metadata-fix and squashes the following commits:

74b3d03 [kai] Added contains(key) to Metadata


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbddac23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbddac23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbddac23

Branch: refs/heads/master
Commit: cbddac23696d89b672dce380cc7360a873e27b3b
Parents: 60969b0
Author: kai kaiz...@eecs.berkeley.edu
Authored: Fri Nov 14 23:44:23 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 23:44:23 2014 -0800

--
 .../org/apache/spark/sql/catalyst/util/Metadata.scala  |  3 +++
 .../apache/spark/sql/catalyst/util/MetadataSuite.scala | 13 +
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cbddac23/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
old mode 100644
new mode 100755
index 2f2082f..8172733
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
@@ -34,6 +34,9 @@ import org.json4s.jackson.JsonMethods._
  */
 sealed class Metadata private[util] (private[util] val map: Map[String, Any]) 
extends Serializable {
 
+  /** Tests whether this Metadata contains a binding for a key. */
+  def contains(key: String): Boolean = map.contains(key)
+
   /** Gets a Long. */
   def getLong(key: String): Long = get(key)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cbddac23/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
old mode 100644
new mode 100755
index 0063d31..f005b7d
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
@@ -56,17 +56,30 @@ class MetadataSuite extends FunSuite {
 .build()
 
   test(metadata builder and getters) {
+assert(age.contains(summary) === false)
+assert(age.contains(index) === true)
 assert(age.getLong(index) === 1L)
+assert(age.contains(average) === true)
 assert(age.getDouble(average) === 45.0)
+assert(age.contains(categorical) === true)
 assert(age.getBoolean(categorical) === false)
+assert(age.contains(name) === true)
 assert(age.getString(name) === age)
+assert(metadata.contains(purpose) === true)
 assert(metadata.getString(purpose) === ml)
+assert(metadata.contains(isBase) === true)
 assert(metadata.getBoolean(isBase) === false)
+assert(metadata.contains(summary) === true)
 assert(metadata.getMetadata(summary) === summary)
+assert(metadata.contains(long[]) === true)
 assert(metadata.getLongArray(long[]).toSeq === Seq(0L, 1L))
+assert(metadata.contains(double[]) === true)
 assert(metadata.getDoubleArray(double[]).toSeq === Seq(3.0, 4.0))
+assert(metadata.contains(boolean[]) === true)
 assert(metadata.getBooleanArray(boolean[]).toSeq === Seq(true, false))
+assert(gender.contains(categories) === true)
 assert(gender.getStringArray(categories).toSeq === Seq(male, female))
+assert(metadata.contains(features) === true)
 assert(metadata.getMetadataArray(features).toSeq === Seq(age, gender))
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Added contains(key) to Metadata

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 37716b795 - c044e1241


Added contains(key) to Metadata

Add contains(key) to org.apache.spark.sql.catalyst.util.Metadata to test the 
existence of a key. Otherwise, Class Metadata's get methods may throw 
NoSuchElement exception if the key does not exist.
Testcases are added to MetadataSuite as well.

Author: kai kaiz...@eecs.berkeley.edu

Closes #3273 from kai-zeng/metadata-fix and squashes the following commits:

74b3d03 [kai] Added contains(key) to Metadata

(cherry picked from commit cbddac23696d89b672dce380cc7360a873e27b3b)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c044e124
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c044e124
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c044e124

Branch: refs/heads/branch-1.2
Commit: c044e124115cc8e9ffb44d12c2744f33362f366f
Parents: 37716b7
Author: kai kaiz...@eecs.berkeley.edu
Authored: Fri Nov 14 23:44:23 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 23:44:40 2014 -0800

--
 .../org/apache/spark/sql/catalyst/util/Metadata.scala  |  3 +++
 .../apache/spark/sql/catalyst/util/MetadataSuite.scala | 13 +
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c044e124/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
old mode 100644
new mode 100755
index 2f2082f..8172733
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
@@ -34,6 +34,9 @@ import org.json4s.jackson.JsonMethods._
  */
 sealed class Metadata private[util] (private[util] val map: Map[String, Any]) 
extends Serializable {
 
+  /** Tests whether this Metadata contains a binding for a key. */
+  def contains(key: String): Boolean = map.contains(key)
+
   /** Gets a Long. */
   def getLong(key: String): Long = get(key)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c044e124/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
old mode 100644
new mode 100755
index 0063d31..f005b7d
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
@@ -56,17 +56,30 @@ class MetadataSuite extends FunSuite {
 .build()
 
   test(metadata builder and getters) {
+assert(age.contains(summary) === false)
+assert(age.contains(index) === true)
 assert(age.getLong(index) === 1L)
+assert(age.contains(average) === true)
 assert(age.getDouble(average) === 45.0)
+assert(age.contains(categorical) === true)
 assert(age.getBoolean(categorical) === false)
+assert(age.contains(name) === true)
 assert(age.getString(name) === age)
+assert(metadata.contains(purpose) === true)
 assert(metadata.getString(purpose) === ml)
+assert(metadata.contains(isBase) === true)
 assert(metadata.getBoolean(isBase) === false)
+assert(metadata.contains(summary) === true)
 assert(metadata.getMetadata(summary) === summary)
+assert(metadata.contains(long[]) === true)
 assert(metadata.getLongArray(long[]).toSeq === Seq(0L, 1L))
+assert(metadata.contains(double[]) === true)
 assert(metadata.getDoubleArray(double[]).toSeq === Seq(3.0, 4.0))
+assert(metadata.contains(boolean[]) === true)
 assert(metadata.getBooleanArray(boolean[]).toSeq === Seq(true, false))
+assert(gender.contains(categories) === true)
 assert(gender.getStringArray(categories).toSeq === Seq(male, female))
+assert(metadata.contains(features) === true)
 assert(metadata.getMetadataArray(features).toSeq === Seq(age, gender))
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-2321] Several progress API improvements / refactorings

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 c044e1241 - 9eac5fee6


[SPARK-2321] Several progress API improvements / refactorings

This PR refactors / extends the status API introduced in #2696.

- Change StatusAPI from a mixin trait to a class.  Before, the new status API 
methods were directly accessible through SparkContext, whereas now they're 
accessed through a `sc.statusAPI` field.  As long as we were going to add these 
methods directly to SparkContext, the mixin trait seemed like a good idea, but 
this might be simpler to reason about and may avoid pitfalls that I've run into 
while attempting to refactor other parts of SparkContext to use mixins (see 
#3071, for example).
- Change the name from SparkStatusAPI to SparkStatusTracker.
- Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated 
with any job group.
- Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids 
of whatever's currently active in this SparkContext.  This should simplify 
davies's progress bar code.

Author: Josh Rosen joshro...@databricks.com

Closes #3197 from JoshRosen/progress-api-improvements and squashes the 
following commits:

30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker.
d1b08d8 [Josh Rosen] Add missing newlines
2cc7353 [Josh Rosen] Add missing file.
d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods.
a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default 
group
c47e294 [Josh Rosen] Remove StatusAPI mixin trait.

(cherry picked from commit 40eb8b6ef3a67e36d0d9492c044981a1da76351d)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9eac5fee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9eac5fee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9eac5fee

Branch: refs/heads/branch-1.2
Commit: 9eac5fee64def9a18d8961069f631a176f339a5b
Parents: c044e12
Author: Josh Rosen joshro...@databricks.com
Authored: Fri Nov 14 23:46:25 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 23:46:42 2014 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  68 -
 .../scala/org/apache/spark/SparkStatusAPI.scala | 142 ---
 .../org/apache/spark/SparkStatusTracker.scala   | 107 ++
 .../spark/api/java/JavaSparkContext.scala   |  21 +--
 .../spark/api/java/JavaSparkStatusTracker.scala |  72 ++
 .../scala/org/apache/spark/StatusAPISuite.scala |  78 --
 .../org/apache/spark/StatusTrackerSuite.scala   |  89 
 .../spark/examples/JavaStatusAPIDemo.java   |  70 -
 .../spark/examples/JavaStatusTrackerDemo.java   |  70 +
 9 files changed, 407 insertions(+), 310 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9eac5fee/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 03ea672..65edeef 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
@@ -61,7 +62,7 @@ import org.apache.spark.util._
  *   this config overrides the default configs as well as system properties.
  */
 
-class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
+class SparkContext(config: SparkConf) extends Logging {
 
   // This is used only by YARN for now, but should be relevant to other 
cluster types (Mesos,
   // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It
@@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
   private[spark] val jobProgressListener = new JobProgressListener(conf)
   listenerBus.addListener(jobProgressListener)
 
+  val statusTracker = new SparkStatusTracker(this)
+
   // Initialize the Spark UI
   private[spark] val ui: Option[SparkUI] =
 if (conf.getBoolean(spark.ui.enabled, true)) {
@@ -1002,6 +1005,69 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
   def version = SPARK_VERSION
 
   /**
+   * Return a map from the slave to the max memory available for caching and 
the remaining
+   * memory available for caching.
+   */
+  def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
+

spark git commit: [SPARK-2321] Several progress API improvements / refactorings

2014-11-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master cbddac236 - 40eb8b6ef


[SPARK-2321] Several progress API improvements / refactorings

This PR refactors / extends the status API introduced in #2696.

- Change StatusAPI from a mixin trait to a class.  Before, the new status API 
methods were directly accessible through SparkContext, whereas now they're 
accessed through a `sc.statusAPI` field.  As long as we were going to add these 
methods directly to SparkContext, the mixin trait seemed like a good idea, but 
this might be simpler to reason about and may avoid pitfalls that I've run into 
while attempting to refactor other parts of SparkContext to use mixins (see 
#3071, for example).
- Change the name from SparkStatusAPI to SparkStatusTracker.
- Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated 
with any job group.
- Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids 
of whatever's currently active in this SparkContext.  This should simplify 
davies's progress bar code.

Author: Josh Rosen joshro...@databricks.com

Closes #3197 from JoshRosen/progress-api-improvements and squashes the 
following commits:

30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker.
d1b08d8 [Josh Rosen] Add missing newlines
2cc7353 [Josh Rosen] Add missing file.
d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods.
a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default 
group
c47e294 [Josh Rosen] Remove StatusAPI mixin trait.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40eb8b6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40eb8b6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40eb8b6e

Branch: refs/heads/master
Commit: 40eb8b6ef3a67e36d0d9492c044981a1da76351d
Parents: cbddac2
Author: Josh Rosen joshro...@databricks.com
Authored: Fri Nov 14 23:46:25 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Fri Nov 14 23:46:25 2014 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  68 -
 .../scala/org/apache/spark/SparkStatusAPI.scala | 142 ---
 .../org/apache/spark/SparkStatusTracker.scala   | 107 ++
 .../spark/api/java/JavaSparkContext.scala   |  21 +--
 .../spark/api/java/JavaSparkStatusTracker.scala |  72 ++
 .../scala/org/apache/spark/StatusAPISuite.scala |  78 --
 .../org/apache/spark/StatusTrackerSuite.scala   |  89 
 .../spark/examples/JavaStatusAPIDemo.java   |  70 -
 .../spark/examples/JavaStatusTrackerDemo.java   |  70 +
 9 files changed, 407 insertions(+), 310 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40eb8b6e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 03ea672..65edeef 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
@@ -61,7 +62,7 @@ import org.apache.spark.util._
  *   this config overrides the default configs as well as system properties.
  */
 
-class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
+class SparkContext(config: SparkConf) extends Logging {
 
   // This is used only by YARN for now, but should be relevant to other 
cluster types (Mesos,
   // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It
@@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
   private[spark] val jobProgressListener = new JobProgressListener(conf)
   listenerBus.addListener(jobProgressListener)
 
+  val statusTracker = new SparkStatusTracker(this)
+
   // Initialize the Spark UI
   private[spark] val ui: Option[SparkUI] =
 if (conf.getBoolean(spark.ui.enabled, true)) {
@@ -1002,6 +1005,69 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
   def version = SPARK_VERSION
 
   /**
+   * Return a map from the slave to the max memory available for caching and 
the remaining
+   * memory available for caching.
+   */
+  def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
+env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =
+  (blockManagerId.host + : +