git commit: [SPARK-1745] Move interrupted flag from TaskContext constructor (minor)

2014-05-10 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 44dd57fb6 - c3f8b78c2


[SPARK-1745] Move interrupted flag from TaskContext constructor (minor)

It makes little sense to start a TaskContext that is interrupted. Indeed, I 
searched for all use cases of it and didn't find a single instance in which 
`interrupted` is true on construction.

This was inspired by reviewing #640, which adds an additional `@volatile var 
completed` that is similar. These are not the most urgent changes, but I wanted 
to push them out before I forget.

Author: Andrew Or andrewo...@gmail.com

Closes #675 from andrewor14/task-context and squashes the following commits:

9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3f8b78c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3f8b78c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3f8b78c

Branch: refs/heads/master
Commit: c3f8b78c211df6c5adae74f37e39fb55baeff723
Parents: 44dd57f
Author: Andrew Or andrewo...@gmail.com
Authored: Thu May 8 12:13:07 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Thu May 8 12:13:07 2014 -0700

--
 .../scala/org/apache/spark/TaskContext.scala| 20 +++-
 .../apache/spark/scheduler/ShuffleMapTask.scala |  3 +--
 .../java/org/apache/spark/JavaAPISuite.java |  2 +-
 .../org/apache/spark/CacheManagerSuite.scala| 10 +++---
 .../scala/org/apache/spark/PipedRDDSuite.scala  |  4 +---
 5 files changed, 17 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/TaskContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index fc48127..51f40c3 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics
  */
 @DeveloperApi
 class TaskContext(
-  val stageId: Int,
-  val partitionId: Int,
-  val attemptId: Long,
-  val runningLocally: Boolean = false,
-  @volatile var interrupted: Boolean = false,
-  private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
-) extends Serializable {
+val stageId: Int,
+val partitionId: Int,
+val attemptId: Long,
+val runningLocally: Boolean = false,
+private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
+  extends Serializable {
 
   @deprecated(use partitionId, 0.8.1)
   def splitId = partitionId
@@ -42,7 +41,10 @@ class TaskContext(
   // List of callback functions to execute when the task completes.
   @transient private val onCompleteCallbacks = new ArrayBuffer[() = Unit]
 
-  // Set to true when the task is completed, before the onCompleteCallbacks 
are executed.
+  // Whether the corresponding task has been killed.
+  @volatile var interrupted: Boolean = false
+
+  // Whether the task has completed, before the onCompleteCallbacks are 
executed.
   @volatile var completed: Boolean = false
 
   /**
@@ -58,6 +60,6 @@ class TaskContext(
   def executeOnCompleteCallbacks() {
 completed = true
 // Process complete callbacks in the reverse order of registration
-onCompleteCallbacks.reverse.foreach{_()}
+onCompleteCallbacks.reverse.foreach { _() }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 2259df0..4b0324f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -23,7 +23,6 @@ import java.io._
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.mutable.HashMap
-import scala.util.Try
 
 import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
@@ -70,7 +69,7 @@ private[spark] object ShuffleMapTask {
   }
 
   // Since both the JarSet and FileSet have the same format this is used for 
both.
-  def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
+  def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = {
 val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
 val objIn = new 

git commit: [SPARK-1688] Propagate PySpark worker stderr to driver

2014-05-10 Thread adav
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 0759ee790 - 82c8e89c9


[SPARK-1688] Propagate PySpark worker stderr to driver

When at least one of the following conditions is true, PySpark cannot be loaded:

1. PYTHONPATH is not set
2. PYTHONPATH does not contain the python directory (or jar, in the case of 
YARN)
3. The jar does not contain pyspark files (YARN)
4. The jar does not contain py4j files (YARN)

However, we currently throw the same random `java.io.EOFException` for all of 
the above cases, when trying to read from the python daemon's output. This 
message is super unhelpful.

This PR includes the python stderr and the PYTHONPATH in the exception 
propagated to the driver. Now, the exception message looks something like:

```
Error from python worker:
  : No module named pyspark
PYTHONPATH was:
  /path/to/spark/python:/path/to/some/jar
java.io.EOFException
  stack trace
```

whereas before it was just

```
java.io.EOFException
  stack trace
```

Author: Andrew Or andrewo...@gmail.com

Closes #603 from andrewor14/pyspark-exception and squashes the following 
commits:

10d65d3 [Andrew Or] Throwable - Exception, worker - daemon
862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor)
cc09c45 [Andrew Or] Account for the fact that the python daemon may not have 
terminated yet
444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH
aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
783efe2 [Andrew Or] Make python daemon stderr indentation consistent
9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things 
around
29f9688 [Andrew Or] Add back original exception type
e92d36b [Andrew Or] Include python worker stderr in the exception propagated to 
the driver
7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is 
not set
dcc0353 [Andrew Or] Check both python and system environment variables for 
PYTHONPATH
6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting 
python workers

(cherry picked from commit 5200872243aa5906dc8a06772e61d75f19557aac)
Signed-off-by: Aaron Davidson aa...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82c8e89c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82c8e89c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82c8e89c

Branch: refs/heads/branch-1.0
Commit: 82c8e89c9581c45c7878b8f406cf3d90d4b0d74c
Parents: 0759ee7
Author: Andrew Or andrewo...@gmail.com
Authored: Wed May 7 14:35:22 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Wed May 7 14:35:37 2014 -0700

--
 .../apache/spark/api/python/PythonUtils.scala   |  27 +++-
 .../spark/api/python/PythonWorkerFactory.scala  | 136 ---
 .../org/apache/spark/deploy/PythonRunner.scala  |  24 +---
 .../scala/org/apache/spark/util/Utils.scala |  37 +
 4 files changed, 123 insertions(+), 101 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index cf69fa1..6d3e257 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.python
 
-import java.io.File
+import java.io.{File, InputStream, IOException, OutputStream}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -40,3 +40,28 @@ private[spark] object PythonUtils {
 paths.filter(_ != ).mkString(File.pathSeparator)
   }
 }
+
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(
+in: InputStream,
+out: OutputStream,
+name: String)
+  extends Thread(name) {
+
+  setDaemon(true)
+  override def run() {
+scala.util.control.Exception.ignoring(classOf[IOException]) {
+  // FIXME: We copy the stream on the level of bytes to avoid encoding 
problems.
+  val buf = new Array[Byte](1024)
+  var len = in.read(buf)
+  while (len != -1) {
+out.write(buf, 0, len)
+out.flush()
+len = in.read(buf)
+  }
+}
+  }
+}


git commit: [SPARK-1743][MLLIB] add loadLibSVMFile and saveAsLibSVMFile to pyspark

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 879bd - bb90e87f6


[SPARK-1743][MLLIB] add loadLibSVMFile and saveAsLibSVMFile to pyspark

Make loading/saving labeled data easier for pyspark users.

Also changed type check in `SparseVector` to allow numpy integers.

Author: Xiangrui Meng m...@databricks.com

Closes #672 from mengxr/pyspark-mllib-util and squashes the following commits:

2943fa7 [Xiangrui Meng] format docs
d61668d [Xiangrui Meng] add loadLibSVMFile and saveAsLibSVMFile to pyspark
(cherry picked from commit 3188553f73970270717a7fee4a116e29ad4becc9)

Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb90e87f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb90e87f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb90e87f

Branch: refs/heads/branch-1.0
Commit: bb90e87f6a1cad72865749692d46e5ad6c7d93d7
Parents: 879
Author: Xiangrui Meng m...@databricks.com
Authored: Wed May 7 16:01:11 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Wed May 7 16:01:18 2014 -0700

--
 python/pyspark/mllib/linalg.py |   3 +-
 python/pyspark/mllib/util.py   | 177 
 2 files changed, 178 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb90e87f/python/pyspark/mllib/linalg.py
--
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 0aa3a51..7511ca7 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -49,8 +49,7 @@ class SparseVector(object):
  print SparseVector(4, [1, 3], [1.0, 5.5])
 [1: 1.0, 3: 5.5]
 
-assert type(size) == int, first argument must be an int
-self.size = size
+self.size = int(size)
 assert 1 = len(args) = 2, must pass either 2 or 3 arguments
 if len(args) == 1:
 pairs = args[0]

http://git-wip-us.apache.org/repos/asf/spark/blob/bb90e87f/python/pyspark/mllib/util.py
--
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
new file mode 100644
index 000..50d0cdd
--- /dev/null
+++ b/python/pyspark/mllib/util.py
@@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+
+from pyspark.mllib.linalg import Vectors, SparseVector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib._common import _convert_vector
+
+class MLUtils:
+
+Helper methods to load, save and pre-process data used in MLlib.
+
+
+@staticmethod
+def _parse_libsvm_line(line, multiclass):
+
+Parses a line in LIBSVM format into (label, indices, values).
+
+items = line.split(None)
+label = float(items[0])
+if not multiclass:
+label = 1.0 if label  0.5 else 0.0
+nnz = len(items) - 1
+indices = np.zeros(nnz, dtype=np.int32)
+values = np.zeros(nnz)
+for i in xrange(nnz):
+index, value = items[1 + i].split(:)
+indices[i] = int(index) - 1
+values[i] = float(value)
+return label, indices, values
+
+
+@staticmethod
+def _convert_labeled_point_to_libsvm(p):
+Converts a LabeledPoint to a string in LIBSVM format.
+items = [str(p.label)]
+v = _convert_vector(p.features)
+if type(v) == np.ndarray:
+for i in xrange(len(v)):
+items.append(str(i + 1) + : + str(v[i]))
+elif type(v) == SparseVector:
+nnz = len(v.indices)
+for i in xrange(nnz):
+items.append(str(v.indices[i] + 1) + : + str(v.values[i]))
+else:
+raise TypeError(_convert_labeled_point_to_libsvm needs either 
ndarray or SparseVector
+ but got  % type(v))
+return  .join(items)
+
+
+@staticmethod
+def 

git commit: Fixing typo in als.py

2014-05-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 6f701ff55 - 98944a973


Fixing typo in als.py

XtY should be Xty.

Author: Evan Sparks evan.spa...@gmail.com

Closes #696 from etrain/patch-2 and squashes the following commits:

634cb8d [Evan Sparks] Fixing typo in als.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98944a97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98944a97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98944a97

Branch: refs/heads/branch-1.0
Commit: 98944a9734389cd4400516a1eb3afa5376f44927
Parents: 6f701ff
Author: Evan Sparks evan.spa...@gmail.com
Authored: Thu May 8 13:07:30 2014 -0700
Committer: Shivaram Venkataraman shiva...@eecs.berkeley.edu
Committed: Thu May 8 16:49:33 2014 -0700

--
 examples/src/main/python/als.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/98944a97/examples/src/main/python/als.py
--
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index 33700ab..01552dc 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -38,7 +38,7 @@ def update(i, vec, mat, ratings):
 ff = mat.shape[1]
 
 XtX = mat.T * mat
-XtY = mat.T * ratings[i, :].T
+Xty = mat.T * ratings[i, :].T
 
 for j in range(ff):
 XtX[j,j] += LAMBDA * uu



git commit: [HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated.

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 51e277557 - ade47562b


[HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 
was last updated.

This resulted in Compilation Errors.
cc @mateiz project not compiling currently.

Author: Sandeep sand...@techaddict.me

Closes #673 from techaddict/SPARK-1637-HOTFIX and squashes the following 
commits:

b512f4f [Sandeep] [SPARK-1637][HOTFIX] There are some Streaming examples added 
after the PR #571 was last updated. This resulted in Compilation Errors.
(cherry picked from commit fdae095de2daa1fc3b343c05e515235756d856a4)

Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ade47562
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ade47562
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ade47562

Branch: refs/heads/branch-1.0
Commit: ade47562b75c2056359ac63e843360677a0deab1
Parents: 51e2775
Author: Sandeep sand...@techaddict.me
Authored: Tue May 6 21:55:05 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Tue May 6 21:55:16 2014 -0700

--
 .../examples/streaming/JavaCustomReceiver.java  | 151 ++
 .../streaming/examples/JavaCustomReceiver.java  | 153 ---
 .../examples/streaming/CustomReceiver.scala | 108 +
 .../streaming/examples/CustomReceiver.scala | 108 -
 4 files changed, 259 insertions(+), 261 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ade47562/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
new file mode 100644
index 000..7f558f3
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is 
interpreted as
+ * text and \n delimited lines are considered as records. They are then 
counted and printed.
+ *
+ * Usage: JavaCustomReceiver master hostname port
+ *   master is the Spark master URL. In local mode, master should be 
'local[n]' with n  1.
+ *   hostname and port of the TCP server that Spark Streaming would 
connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] 
localhost `
+ */
+
+public class JavaCustomReceiver extends ReceiverString {
+  private static final Pattern SPACE = Pattern.compile( );
+
+  public static void main(String[] args) {
+if (args.length  3) {
+  System.err.println(Usage: JavaNetworkWordCount master hostname 
port\n +
+  In local mode, master should be 'local[n]' with n  1);
+  System.exit(1);
+}
+
+StreamingExamples.setStreamingLogLevels();
+
+// Create 

git commit: [SQL] Improve SparkSQL Aggregates

2014-05-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6ed7e2cd0 - 19c8fb02b


[SQL] Improve SparkSQL Aggregates

* Add native min/max (was using hive before).
* Handle nulls correctly in Avg and Sum.

Author: Michael Armbrust mich...@databricks.com

Closes #683 from marmbrus/aggFixes and squashes the following commits:

64fe30b [Michael Armbrust] Improve SparkSQL Aggregates * Add native min/max 
(was using hive before). * Handle nulls correctly in Avg and Sum.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19c8fb02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19c8fb02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19c8fb02

Branch: refs/heads/master
Commit: 19c8fb02bc2c2f76c3c45bfff4b8d093be9d7c66
Parents: 6ed7e2c
Author: Michael Armbrust mich...@databricks.com
Authored: Thu May 8 01:08:43 2014 -0400
Committer: Reynold Xin r...@apache.org
Committed: Thu May 8 01:08:43 2014 -0400

--
 .../apache/spark/sql/catalyst/SqlParser.scala   |  4 +
 .../sql/catalyst/expressions/aggregates.scala   | 85 +---
 .../org/apache/spark/sql/SQLQuerySuite.scala|  7 ++
 .../scala/org/apache/spark/sql/TestData.scala   | 10 +++
 4 files changed, 96 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/19c8fb02/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 8c76a3a..b3a3a1e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -114,6 +114,8 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val JOIN = Keyword(JOIN)
   protected val LEFT = Keyword(LEFT)
   protected val LIMIT = Keyword(LIMIT)
+  protected val MAX = Keyword(MAX)
+  protected val MIN = Keyword(MIN)
   protected val NOT = Keyword(NOT)
   protected val NULL = Keyword(NULL)
   protected val ON = Keyword(ON)
@@ -318,6 +320,8 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
 COUNT ~ ( ~ DISTINCT ~ expression ~ ) ^^ { case exp = 
CountDistinct(exp :: Nil) } |
 FIRST ~ ( ~ expression ~ ) ^^ { case exp = First(exp) } |
 AVG ~ ( ~ expression ~ ) ^^ { case exp = Average(exp) } |
+MIN ~ ( ~ expression ~ ) ^^ { case exp = Min(exp) } |
+MAX ~ ( ~ expression ~ ) ^^ { case exp = Max(exp) } |
 IF ~ ( ~ expression ~ , ~ expression ~ , ~ expression ~ ) ^^ {
   case c ~ , ~ t ~ , ~ f = If(c,t,f)
 } |

http://git-wip-us.apache.org/repos/asf/spark/blob/19c8fb02/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index b152f95..d37 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -86,6 +86,67 @@ abstract class AggregateFunction
   override def newInstance() = makeCopy(productIterator.map { case a: AnyRef 
= a }.toArray)
 }
 
+case class Min(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
+  override def references = child.references
+  override def nullable = child.nullable
+  override def dataType = child.dataType
+  override def toString = sMIN($child)
+
+  override def asPartial: SplitEvaluation = {
+val partialMin = Alias(Min(child), PartialMin)()
+SplitEvaluation(Min(partialMin.toAttribute), partialMin :: Nil)
+  }
+
+  override def newInstance() = new MinFunction(child, this)
+}
+
+case class MinFunction(expr: Expression, base: AggregateExpression) extends 
AggregateFunction {
+  def this() = this(null, null) // Required for serialization.
+
+  var currentMin: Any = _
+
+  override def update(input: Row): Unit = {
+if (currentMin == null) {
+  currentMin = expr.eval(input)
+} else if(GreaterThan(Literal(currentMin, expr.dataType), 
expr).eval(input) == true) {
+  currentMin = expr.eval(input)
+}
+  }
+
+  override def eval(input: Row): Any = currentMin
+}
+
+case class Max(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
+  override def references = child.references
+  override def nullable = child.nullable
+  override def dataType = child.dataType
+  override def toString = sMAX($child)
+
+  override def 

git commit: MLlib documentation fix

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 322b1808d - d38febee4


MLlib documentation fix

Fixed the documentation for that `loadLibSVMData` is changed to 
`loadLibSVMFile`.

Author: DB Tsai dbt...@alpinenow.com

Closes #703 from dbtsai/dbtsai-docfix and squashes the following commits:

71dd508 [DB Tsai] loadLibSVMData is changed to loadLibSVMFile


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d38febee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d38febee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d38febee

Branch: refs/heads/master
Commit: d38febee46ed156b0c8ec64757db6c290e488421
Parents: 322b180
Author: DB Tsai dbt...@alpinenow.com
Authored: Thu May 8 17:52:32 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Thu May 8 17:52:32 2014 -0700

--
 docs/mllib-basics.md | 8 
 docs/mllib-linear-methods.md | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d38febee/docs/mllib-basics.md
--
diff --git a/docs/mllib-basics.md b/docs/mllib-basics.md
index 7043088..aa9321a 100644
--- a/docs/mllib-basics.md
+++ b/docs/mllib-basics.md
@@ -184,7 +184,7 @@ After loading, the feature indices are converted to 
zero-based.
 div class=codetabs
 div data-lang=scala markdown=1
 
-[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$)
 reads training
+[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$)
 reads training
 examples stored in LIBSVM format.
 
 {% highlight scala %}
@@ -192,12 +192,12 @@ import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 
-val training: RDD[LabeledPoint] = MLUtils.loadLibSVMData(sc, 
mllib/data/sample_libsvm_data.txt)
+val training: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, 
mllib/data/sample_libsvm_data.txt)
 {% endhighlight %}
 /div
 
 div data-lang=java markdown=1
-[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$)
 reads training
+[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$)
 reads training
 examples stored in LIBSVM format.
 
 {% highlight java %}
@@ -205,7 +205,7 @@ import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.util.MLUtils;
 import org.apache.spark.rdd.RDDimport;
 
-RDDLabeledPoint training = MLUtils.loadLibSVMData(jsc, 
mllib/data/sample_libsvm_data.txt);
+RDDLabeledPoint training = MLUtils.loadLibSVMFile(jsc, 
mllib/data/sample_libsvm_data.txt);
 {% endhighlight %}
 /div
 /div

http://git-wip-us.apache.org/repos/asf/spark/blob/d38febee/docs/mllib-linear-methods.md
--
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 40b7a7f..eff617d 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -186,7 +186,7 @@ import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLUtils
 
 // Load training data in LIBSVM format.
-val data = MLUtils.loadLibSVMData(sc, mllib/data/sample_libsvm_data.txt)
+val data = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt)
 
 // Split data into training (60%) and test (40%).
 val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)



git commit: Add Python includes to path before depickling broadcast values

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 71ad53f81 - 2a669a70d


Add Python includes to path before depickling broadcast values

This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the 
Python includes to the PYTHONPATH before depickling the broadcast values

@airhorns

Author: Bouke van der Bijl boukevanderb...@gmail.com

Closes #656 from bouk/python-includes-before-broadcast and squashes the 
following commits:

7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling 
broadcast values
(cherry picked from commit 3776f2f283842543ff766398292532c6e94221cc)

Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a669a70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a669a70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a669a70

Branch: refs/heads/branch-1.0
Commit: 2a669a70d09c33480d9db2d15e4a2f69dd214c77
Parents: 71ad53f
Author: Bouke van der Bijl boukevanderb...@gmail.com
Authored: Sat May 10 13:02:13 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 13:02:23 2014 -0700

--
 .../scala/org/apache/spark/api/python/PythonRDD.scala | 10 +-
 python/pyspark/worker.py  | 14 +++---
 2 files changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a669a70/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fecd976..388b838 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -179,6 +179,11 @@ private[spark] class PythonRDD[T: ClassTag](
 dataOut.writeInt(split.index)
 // sparkFilesDir
 PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
+// Python includes (*.zip and *.egg files)
+dataOut.writeInt(pythonIncludes.length)
+for (include - pythonIncludes) {
+  PythonRDD.writeUTF(include, dataOut)
+}
 // Broadcast variables
 dataOut.writeInt(broadcastVars.length)
 for (broadcast - broadcastVars) {
@@ -186,11 +191,6 @@ private[spark] class PythonRDD[T: ClassTag](
   dataOut.writeInt(broadcast.value.length)
   dataOut.write(broadcast.value)
 }
-// Python includes (*.zip and *.egg files)
-dataOut.writeInt(pythonIncludes.length)
-for (include - pythonIncludes) {
-  PythonRDD.writeUTF(include, dataOut)
-}
 dataOut.flush()
 // Serialized command:
 dataOut.writeInt(command.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/2a669a70/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4c214ef..f43210c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -56,13 +56,6 @@ def main(infile, outfile):
 SparkFiles._root_directory = spark_files_dir
 SparkFiles._is_running_on_worker = True
 
-# fetch names and values of broadcast variables
-num_broadcast_variables = read_int(infile)
-for _ in range(num_broadcast_variables):
-bid = read_long(infile)
-value = pickleSer._read_with_length(infile)
-_broadcastRegistry[bid] = Broadcast(bid, value)
-
 # fetch names of includes (*.zip and *.egg files) and construct 
PYTHONPATH
 sys.path.append(spark_files_dir) # *.py files that were added will be 
copied here
 num_python_includes =  read_int(infile)
@@ -70,6 +63,13 @@ def main(infile, outfile):
 filename = utf8_deserializer.loads(infile)
 sys.path.append(os.path.join(spark_files_dir, filename))
 
+# fetch names and values of broadcast variables
+num_broadcast_variables = read_int(infile)
+for _ in range(num_broadcast_variables):
+bid = read_long(infile)
+value = pickleSer._read_with_length(infile)
+_broadcastRegistry[bid] = Broadcast(bid, value)
+
 command = pickleSer._read_with_length(infile)
 (func, deserializer, serializer) = command
 init_time = time.time()



git commit: SPARK-1686: keep schedule() calling in the main thread

2014-05-10 Thread adav
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 8202276c9 - adf8cdd0b


SPARK-1686: keep schedule() calling in the main thread

https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be 
called when a standalone Master is recovering from failure. It is responsible 
for resetting some state, relaunching drivers, and eventually resuming its 
scheduling duties.

There are currently four places in Master.scala where completeRecovery is 
called. Three of them are from within the actor's receive method, and aren't 
problems. The last starts from within receive when the ElectedLeader message is 
received, but the actual completeRecovery() call is made from the Akka 
scheduler. That means that it will execute on a different scheduler thread, and 
Master itself will end up running (i.e., schedule() ) from that Akka scheduler 
thread.

In this PR, I added a new master message TriggerSchedule to trigger the local 
call of schedule() in the scheduler thread

Author: CodingCat zhunans...@gmail.com

Closes #639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread

(cherry picked from commit 2f452cbaf35dbc609ab48ec0ee5e3dd7b6b9b790)
Signed-off-by: Aaron Davidson aa...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adf8cdd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adf8cdd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adf8cdd0

Branch: refs/heads/branch-1.0
Commit: adf8cdd0b29731325f08552d050c43fe1bbd724f
Parents: 8202276
Author: CodingCat zhunans...@gmail.com
Authored: Fri May 9 21:50:23 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Fri May 9 21:52:40 2014 -0700

--
 .../org/apache/spark/deploy/master/Master.scala  | 15 ---
 1 file changed, 12 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/adf8cdd0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index fdb633b..f254f55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -104,6 +104,8 @@ private[spark] class Master(
 
   var leaderElectionAgent: ActorRef = _
 
+  private var recoveryCompletionTask: Cancellable = _
+
   // As a temporary workaround before better ways of configuring memory, we 
allow users to set
   // a flag that will perform round-robin scheduling across the nodes 
(spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a 
small # of nodes.
@@ -152,6 +154,10 @@ private[spark] class Master(
   }
 
   override def postStop() {
+// prevent the CompleteRecovery message sending to restarted master
+if (recoveryCompletionTask != null) {
+  recoveryCompletionTask.cancel()
+}
 webUi.stop()
 fileSystemsUsed.foreach(_.close())
 masterMetricsSystem.stop()
@@ -171,10 +177,13 @@ private[spark] class Master(
   logInfo(I have been elected leader! New state:  + state)
   if (state == RecoveryState.RECOVERING) {
 beginRecovery(storedApps, storedDrivers, storedWorkers)
-context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { 
completeRecovery() }
+recoveryCompletionTask = 
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
+  CompleteRecovery)
   }
 }
 
+case CompleteRecovery = completeRecovery()
+
 case RevokedLeadership = {
   logError(Leadership has been revoked -- master shutting down.)
   System.exit(0)
@@ -465,7 +474,7 @@ private[spark] class Master(
* Schedule the currently available resources among waiting apps. This 
method will be called
* every time a new app joins or resource availability changes.
*/
-  def schedule() {
+  private def schedule() {
 if (state != RecoveryState.ALIVE) { return }
 
 // First schedule drivers, they take strict precedence over applications
@@ -485,7 +494,7 @@ private[spark] class Master(
   // Try to spread out each app among all the nodes, until it has all its 
cores
   for (app - waitingApps if app.coresLeft  0) {
 val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
-   .filter(canUse(app, 

git commit: Add Python includes to path before depickling broadcast values

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master c05d11bb3 - 3776f2f28


Add Python includes to path before depickling broadcast values

This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the 
Python includes to the PYTHONPATH before depickling the broadcast values

@airhorns

Author: Bouke van der Bijl boukevanderb...@gmail.com

Closes #656 from bouk/python-includes-before-broadcast and squashes the 
following commits:

7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling 
broadcast values


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3776f2f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3776f2f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3776f2f2

Branch: refs/heads/master
Commit: 3776f2f283842543ff766398292532c6e94221cc
Parents: c05d11b
Author: Bouke van der Bijl boukevanderb...@gmail.com
Authored: Sat May 10 13:02:13 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 13:02:13 2014 -0700

--
 .../scala/org/apache/spark/api/python/PythonRDD.scala | 10 +-
 python/pyspark/worker.py  | 14 +++---
 2 files changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fecd976..388b838 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -179,6 +179,11 @@ private[spark] class PythonRDD[T: ClassTag](
 dataOut.writeInt(split.index)
 // sparkFilesDir
 PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
+// Python includes (*.zip and *.egg files)
+dataOut.writeInt(pythonIncludes.length)
+for (include - pythonIncludes) {
+  PythonRDD.writeUTF(include, dataOut)
+}
 // Broadcast variables
 dataOut.writeInt(broadcastVars.length)
 for (broadcast - broadcastVars) {
@@ -186,11 +191,6 @@ private[spark] class PythonRDD[T: ClassTag](
   dataOut.writeInt(broadcast.value.length)
   dataOut.write(broadcast.value)
 }
-// Python includes (*.zip and *.egg files)
-dataOut.writeInt(pythonIncludes.length)
-for (include - pythonIncludes) {
-  PythonRDD.writeUTF(include, dataOut)
-}
 dataOut.flush()
 // Serialized command:
 dataOut.writeInt(command.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4c214ef..f43210c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -56,13 +56,6 @@ def main(infile, outfile):
 SparkFiles._root_directory = spark_files_dir
 SparkFiles._is_running_on_worker = True
 
-# fetch names and values of broadcast variables
-num_broadcast_variables = read_int(infile)
-for _ in range(num_broadcast_variables):
-bid = read_long(infile)
-value = pickleSer._read_with_length(infile)
-_broadcastRegistry[bid] = Broadcast(bid, value)
-
 # fetch names of includes (*.zip and *.egg files) and construct 
PYTHONPATH
 sys.path.append(spark_files_dir) # *.py files that were added will be 
copied here
 num_python_includes =  read_int(infile)
@@ -70,6 +63,13 @@ def main(infile, outfile):
 filename = utf8_deserializer.loads(infile)
 sys.path.append(os.path.join(spark_files_dir, filename))
 
+# fetch names and values of broadcast variables
+num_broadcast_variables = read_int(infile)
+for _ in range(num_broadcast_variables):
+bid = read_long(infile)
+value = pickleSer._read_with_length(infile)
+_broadcastRegistry[bid] = Broadcast(bid, value)
+
 command = pickleSer._read_with_length(infile)
 (func, deserializer, serializer) = command
 init_time = time.time()



git commit: [SPARK-1778] [SQL] Add 'limit' transformation to SchemaRDD.

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 a61b71cad - 7486474d6


[SPARK-1778] [SQL] Add 'limit' transformation to SchemaRDD.

Add `limit` transformation to `SchemaRDD`.

Author: Takuya UESHIN ues...@happy-camper.st

Closes #711 from ueshin/issues/SPARK-1778 and squashes the following commits:

33169df [Takuya UESHIN] Add 'limit' transformation to SchemaRDD.
(cherry picked from commit 8e94d2721a9d3d36697e13f8cc6567ae8aeee78b)

Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7486474d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7486474d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7486474d

Branch: refs/heads/branch-1.0
Commit: 7486474d6b9809cd5a6664939343351db4da0e18
Parents: a61b71c
Author: Takuya UESHIN ues...@happy-camper.st
Authored: Sat May 10 12:03:27 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 12:03:44 2014 -0700

--
 .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 9 +
 .../src/test/scala/org/apache/spark/sql/DslQuerySuite.scala | 6 ++
 2 files changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7486474d/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 34200be..2569815 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -179,6 +179,15 @@ class SchemaRDD(
 new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
 
   /**
+   * Limits the results by the given expressions.
+   * {{{
+   *   schemaRDD.limit(10)
+   * }}}
+   */
+  def limit(limitExpr: Expression): SchemaRDD =
+new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan))
+
+  /**
* Performs a grouping followed by an aggregation.
*
* {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/7486474d/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index be0f4a4..92a707e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -71,6 +71,12 @@ class DslQuerySuite extends QueryTest {
   Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
   }
 
+  test(limit) {
+checkAnswer(
+  testData.limit(10),
+  testData.take(10).toSeq)
+  }
+
   test(average) {
 checkAnswer(
   testData2.groupBy()(Average('a)),



git commit: [SPARK-1690] Tolerating empty elements when saving Python RDD to text files

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 3776f2f28 - 6c2691d0a


[SPARK-1690] Tolerating empty elements when saving Python RDD to text files

Tolerate empty strings in PythonRDD

Author: Kan Zhang kzh...@apache.org

Closes #644 from kanzhang/SPARK-1690 and squashes the following commits:

c62ad33 [Kan Zhang] Adding Python doctest
473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python 
RDD to text files


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c2691d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c2691d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c2691d0

Branch: refs/heads/master
Commit: 6c2691d0a0ed46a8b8093e05a4708706cf187168
Parents: 3776f2f
Author: Kan Zhang kzh...@apache.org
Authored: Sat May 10 14:01:08 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 14:01:08 2014 -0700

--
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala   | 5 +++--
 python/pyspark/rdd.py| 8 
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6c2691d0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 388b838..2971c27 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
   val obj = new Array[Byte](length)
   stream.readFully(obj)
   obj
+case 0 = Array.empty[Byte]
 case SpecialLengths.TIMING_DATA =
   // Timing data from worker
   val bootTime = stream.readLong()
@@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
 stream.readFully(update)
 accumulator += Collections.singletonList(update)
   }
-  Array.empty[Byte]
+  null
   }
 } catch {
 
@@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
   var _nextObj = read()
 
-  def hasNext = _nextObj.length != 0
+  def hasNext = _nextObj != null
 }
 new InterruptibleIterator(context, stdoutIterator)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2691d0/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 3a1c56a..4f74824 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -891,6 +891,14 @@ class RDD(object):
  from glob import glob
  ''.join(sorted(input(glob(tempFile.name + /part-*
 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
+
+Empty lines are tolerated when saving to text files.
+
+ tempFile2 = NamedTemporaryFile(delete=True)
+ tempFile2.close()
+ sc.parallelize(['', 'foo', '', 'bar', 
'']).saveAsTextFile(tempFile2.name)
+ ''.join(sorted(input(glob(tempFile2.name + /part-*
+'\\n\\n\\nbar\\nfoo\\n'
 
 def func(split, iterator):
 for x in iterator:



git commit: fix broken in link in python docs

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 9fbb22c20 - 71ad53f81


fix broken in link in python docs

Author: Andy Konwinski andykonwin...@gmail.com

Closes #650 from andyk/python-docs-link-fix and squashes the following commits:

a1f9d51 [Andy Konwinski] fix broken in link in python docs
(cherry picked from commit c05d11bb307eaba40c5669da2d374c28debaa55a)

Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71ad53f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71ad53f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71ad53f8

Branch: refs/heads/branch-1.0
Commit: 71ad53f8176fd9411edf04188a3fe0264777a781
Parents: 9fbb22c
Author: Andy Konwinski andykonwin...@gmail.com
Authored: Sat May 10 12:46:51 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 12:47:18 2014 -0700

--
 docs/python-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71ad53f8/docs/python-programming-guide.md
--
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 6813963..39fb5f0 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -45,7 +45,7 @@ errors = logData.filter(is_error)
 
 PySpark will automatically ship these functions to executors, along with any 
objects that they reference.
 Instances of classes will be serialized and shipped to executors by PySpark, 
but classes themselves cannot be automatically distributed to executors.
-The [Standalone Use](#standalone-use) section describes how to ship code 
dependencies to executors.
+The [Standalone Use](#standalone-programs) section describes how to ship code 
dependencies to executors.
 
 In addition, PySpark fully supports interactive use---simply run 
`./bin/pyspark` to launch an interactive shell.
 



git commit: Revert Enabled incremental build that comes with sbt 0.13.2

2014-05-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 71ce7eb0e - 758e5439f


Revert Enabled incremental build that comes with sbt 0.13.2

This reverts commit 71ce7eb0e5878f0bafd64bdd201ae257a3bfe106.

I meant only to merge this into master. It's an experimental
build feature.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/758e5439
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/758e5439
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/758e5439

Branch: refs/heads/branch-1.0
Commit: 758e5439f3c4ee1335f1e777d5ca580da5d9556d
Parents: 71ce7eb
Author: Patrick Wendell pwend...@gmail.com
Authored: Sat May 10 21:08:53 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Sat May 10 21:08:53 2014 -0700

--
 project/SparkBuild.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/758e5439/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 62d9cb1..6ea30d0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -176,7 +176,7 @@ object SparkBuild extends Build {
 retrievePattern := [type]s/[artifact](-[revision])(-[classifier]).[ext],
 transitiveClassifiers in Scope.GlobalScope := Seq(sources),
 testListeners = target.map(t = Seq(new 
eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
-incOptions := incOptions.value.withNameHashing(true),
+
 // Fork new JVMs for tests and set Java options for those
 fork := true,
 javaOptions in Test += -Dspark.home= + sparkHome,