git commit: [SPARK-1745] Move interrupted flag from TaskContext constructor (minor)
Repository: spark Updated Branches: refs/heads/master 44dd57fb6 - c3f8b78c2 [SPARK-1745] Move interrupted flag from TaskContext constructor (minor) It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or andrewo...@gmail.com Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3f8b78c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3f8b78c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3f8b78c Branch: refs/heads/master Commit: c3f8b78c211df6c5adae74f37e39fb55baeff723 Parents: 44dd57f Author: Andrew Or andrewo...@gmail.com Authored: Thu May 8 12:13:07 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu May 8 12:13:07 2014 -0700 -- .../scala/org/apache/spark/TaskContext.scala| 20 +++- .../apache/spark/scheduler/ShuffleMapTask.scala | 3 +-- .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../org/apache/spark/CacheManagerSuite.scala| 10 +++--- .../scala/org/apache/spark/PipedRDDSuite.scala | 4 +--- 5 files changed, 17 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index fc48127..51f40c3 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics */ @DeveloperApi class TaskContext( - val stageId: Int, - val partitionId: Int, - val attemptId: Long, - val runningLocally: Boolean = false, - @volatile var interrupted: Boolean = false, - private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty -) extends Serializable { +val stageId: Int, +val partitionId: Int, +val attemptId: Long, +val runningLocally: Boolean = false, +private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends Serializable { @deprecated(use partitionId, 0.8.1) def splitId = partitionId @@ -42,7 +41,10 @@ class TaskContext( // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[() = Unit] - // Set to true when the task is completed, before the onCompleteCallbacks are executed. + // Whether the corresponding task has been killed. + @volatile var interrupted: Boolean = false + + // Whether the task has completed, before the onCompleteCallbacks are executed. @volatile var completed: Boolean = false /** @@ -58,6 +60,6 @@ class TaskContext( def executeOnCompleteCallbacks() { completed = true // Process complete callbacks in the reverse order of registration -onCompleteCallbacks.reverse.foreach{_()} +onCompleteCallbacks.reverse.foreach { _() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 2259df0..4b0324f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -23,7 +23,6 @@ import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashMap -import scala.util.Try import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics @@ -70,7 +69,7 @@ private[spark] object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = { + def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = { val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val objIn = new
git commit: [SPARK-1688] Propagate PySpark worker stderr to driver
Repository: spark Updated Branches: refs/heads/branch-1.0 0759ee790 - 82c8e89c9 [SPARK-1688] Propagate PySpark worker stderr to driver When at least one of the following conditions is true, PySpark cannot be loaded: 1. PYTHONPATH is not set 2. PYTHONPATH does not contain the python directory (or jar, in the case of YARN) 3. The jar does not contain pyspark files (YARN) 4. The jar does not contain py4j files (YARN) However, we currently throw the same random `java.io.EOFException` for all of the above cases, when trying to read from the python daemon's output. This message is super unhelpful. This PR includes the python stderr and the PYTHONPATH in the exception propagated to the driver. Now, the exception message looks something like: ``` Error from python worker: : No module named pyspark PYTHONPATH was: /path/to/spark/python:/path/to/some/jar java.io.EOFException stack trace ``` whereas before it was just ``` java.io.EOFException stack trace ``` Author: Andrew Or andrewo...@gmail.com Closes #603 from andrewor14/pyspark-exception and squashes the following commits: 10d65d3 [Andrew Or] Throwable - Exception, worker - daemon 862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor) cc09c45 [Andrew Or] Account for the fact that the python daemon may not have terminated yet 444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 783efe2 [Andrew Or] Make python daemon stderr indentation consistent 9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things around 29f9688 [Andrew Or] Add back original exception type e92d36b [Andrew Or] Include python worker stderr in the exception propagated to the driver 7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is not set dcc0353 [Andrew Or] Check both python and system environment variables for PYTHONPATH 6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting python workers (cherry picked from commit 5200872243aa5906dc8a06772e61d75f19557aac) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82c8e89c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82c8e89c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82c8e89c Branch: refs/heads/branch-1.0 Commit: 82c8e89c9581c45c7878b8f406cf3d90d4b0d74c Parents: 0759ee7 Author: Andrew Or andrewo...@gmail.com Authored: Wed May 7 14:35:22 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 7 14:35:37 2014 -0700 -- .../apache/spark/api/python/PythonUtils.scala | 27 +++- .../spark/api/python/PythonWorkerFactory.scala | 136 --- .../org/apache/spark/deploy/PythonRunner.scala | 24 +--- .../scala/org/apache/spark/util/Utils.scala | 37 + 4 files changed, 123 insertions(+), 101 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index cf69fa1..6d3e257 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.python -import java.io.File +import java.io.{File, InputStream, IOException, OutputStream} import scala.collection.mutable.ArrayBuffer @@ -40,3 +40,28 @@ private[spark] object PythonUtils { paths.filter(_ != ).mkString(File.pathSeparator) } } + + +/** + * A utility class to redirect the child process's stdout or stderr. + */ +private[spark] class RedirectThread( +in: InputStream, +out: OutputStream, +name: String) + extends Thread(name) { + + setDaemon(true) + override def run() { +scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { +out.write(buf, 0, len) +out.flush() +len = in.read(buf) + } +} + } +}
git commit: [SPARK-1743][MLLIB] add loadLibSVMFile and saveAsLibSVMFile to pyspark
Repository: spark Updated Branches: refs/heads/branch-1.0 879bd - bb90e87f6 [SPARK-1743][MLLIB] add loadLibSVMFile and saveAsLibSVMFile to pyspark Make loading/saving labeled data easier for pyspark users. Also changed type check in `SparseVector` to allow numpy integers. Author: Xiangrui Meng m...@databricks.com Closes #672 from mengxr/pyspark-mllib-util and squashes the following commits: 2943fa7 [Xiangrui Meng] format docs d61668d [Xiangrui Meng] add loadLibSVMFile and saveAsLibSVMFile to pyspark (cherry picked from commit 3188553f73970270717a7fee4a116e29ad4becc9) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb90e87f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb90e87f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb90e87f Branch: refs/heads/branch-1.0 Commit: bb90e87f6a1cad72865749692d46e5ad6c7d93d7 Parents: 879 Author: Xiangrui Meng m...@databricks.com Authored: Wed May 7 16:01:11 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Wed May 7 16:01:18 2014 -0700 -- python/pyspark/mllib/linalg.py | 3 +- python/pyspark/mllib/util.py | 177 2 files changed, 178 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb90e87f/python/pyspark/mllib/linalg.py -- diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 0aa3a51..7511ca7 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -49,8 +49,7 @@ class SparseVector(object): print SparseVector(4, [1, 3], [1.0, 5.5]) [1: 1.0, 3: 5.5] -assert type(size) == int, first argument must be an int -self.size = size +self.size = int(size) assert 1 = len(args) = 2, must pass either 2 or 3 arguments if len(args) == 1: pairs = args[0] http://git-wip-us.apache.org/repos/asf/spark/blob/bb90e87f/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py new file mode 100644 index 000..50d0cdd --- /dev/null +++ b/python/pyspark/mllib/util.py @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np + +from pyspark.mllib.linalg import Vectors, SparseVector +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib._common import _convert_vector + +class MLUtils: + +Helper methods to load, save and pre-process data used in MLlib. + + +@staticmethod +def _parse_libsvm_line(line, multiclass): + +Parses a line in LIBSVM format into (label, indices, values). + +items = line.split(None) +label = float(items[0]) +if not multiclass: +label = 1.0 if label 0.5 else 0.0 +nnz = len(items) - 1 +indices = np.zeros(nnz, dtype=np.int32) +values = np.zeros(nnz) +for i in xrange(nnz): +index, value = items[1 + i].split(:) +indices[i] = int(index) - 1 +values[i] = float(value) +return label, indices, values + + +@staticmethod +def _convert_labeled_point_to_libsvm(p): +Converts a LabeledPoint to a string in LIBSVM format. +items = [str(p.label)] +v = _convert_vector(p.features) +if type(v) == np.ndarray: +for i in xrange(len(v)): +items.append(str(i + 1) + : + str(v[i])) +elif type(v) == SparseVector: +nnz = len(v.indices) +for i in xrange(nnz): +items.append(str(v.indices[i] + 1) + : + str(v.values[i])) +else: +raise TypeError(_convert_labeled_point_to_libsvm needs either ndarray or SparseVector + but got % type(v)) +return .join(items) + + +@staticmethod +def
git commit: Fixing typo in als.py
Repository: spark Updated Branches: refs/heads/branch-1.0 6f701ff55 - 98944a973 Fixing typo in als.py XtY should be Xty. Author: Evan Sparks evan.spa...@gmail.com Closes #696 from etrain/patch-2 and squashes the following commits: 634cb8d [Evan Sparks] Fixing typo in als.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98944a97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98944a97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98944a97 Branch: refs/heads/branch-1.0 Commit: 98944a9734389cd4400516a1eb3afa5376f44927 Parents: 6f701ff Author: Evan Sparks evan.spa...@gmail.com Authored: Thu May 8 13:07:30 2014 -0700 Committer: Shivaram Venkataraman shiva...@eecs.berkeley.edu Committed: Thu May 8 16:49:33 2014 -0700 -- examples/src/main/python/als.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98944a97/examples/src/main/python/als.py -- diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index 33700ab..01552dc 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -38,7 +38,7 @@ def update(i, vec, mat, ratings): ff = mat.shape[1] XtX = mat.T * mat -XtY = mat.T * ratings[i, :].T +Xty = mat.T * ratings[i, :].T for j in range(ff): XtX[j,j] += LAMBDA * uu
git commit: [HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated.
Repository: spark Updated Branches: refs/heads/branch-1.0 51e277557 - ade47562b [HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors. cc @mateiz project not compiling currently. Author: Sandeep sand...@techaddict.me Closes #673 from techaddict/SPARK-1637-HOTFIX and squashes the following commits: b512f4f [Sandeep] [SPARK-1637][HOTFIX] There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors. (cherry picked from commit fdae095de2daa1fc3b343c05e515235756d856a4) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ade47562 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ade47562 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ade47562 Branch: refs/heads/branch-1.0 Commit: ade47562b75c2056359ac63e843360677a0deab1 Parents: 51e2775 Author: Sandeep sand...@techaddict.me Authored: Tue May 6 21:55:05 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Tue May 6 21:55:16 2014 -0700 -- .../examples/streaming/JavaCustomReceiver.java | 151 ++ .../streaming/examples/JavaCustomReceiver.java | 153 --- .../examples/streaming/CustomReceiver.scala | 108 + .../streaming/examples/CustomReceiver.scala | 108 - 4 files changed, 259 insertions(+), 261 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ade47562/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java new file mode 100644 index 000..7f558f3 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import com.google.common.collect.Lists; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.receiver.Receiver; +import scala.Tuple2; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ConnectException; +import java.net.Socket; +import java.util.regex.Pattern; + +/** + * Custom Receiver that receives data over a socket. Received bytes is interpreted as + * text and \n delimited lines are considered as records. They are then counted and printed. + * + * Usage: JavaCustomReceiver master hostname port + * master is the Spark master URL. In local mode, master should be 'local[n]' with n 1. + * hostname and port of the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + *`$ nc -lk ` + * and then run the example + *`$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost ` + */ + +public class JavaCustomReceiver extends ReceiverString { + private static final Pattern SPACE = Pattern.compile( ); + + public static void main(String[] args) { +if (args.length 3) { + System.err.println(Usage: JavaNetworkWordCount master hostname port\n + + In local mode, master should be 'local[n]' with n 1); + System.exit(1); +} + +StreamingExamples.setStreamingLogLevels(); + +// Create
git commit: [SQL] Improve SparkSQL Aggregates
Repository: spark Updated Branches: refs/heads/master 6ed7e2cd0 - 19c8fb02b [SQL] Improve SparkSQL Aggregates * Add native min/max (was using hive before). * Handle nulls correctly in Avg and Sum. Author: Michael Armbrust mich...@databricks.com Closes #683 from marmbrus/aggFixes and squashes the following commits: 64fe30b [Michael Armbrust] Improve SparkSQL Aggregates * Add native min/max (was using hive before). * Handle nulls correctly in Avg and Sum. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19c8fb02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19c8fb02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19c8fb02 Branch: refs/heads/master Commit: 19c8fb02bc2c2f76c3c45bfff4b8d093be9d7c66 Parents: 6ed7e2c Author: Michael Armbrust mich...@databricks.com Authored: Thu May 8 01:08:43 2014 -0400 Committer: Reynold Xin r...@apache.org Committed: Thu May 8 01:08:43 2014 -0400 -- .../apache/spark/sql/catalyst/SqlParser.scala | 4 + .../sql/catalyst/expressions/aggregates.scala | 85 +--- .../org/apache/spark/sql/SQLQuerySuite.scala| 7 ++ .../scala/org/apache/spark/sql/TestData.scala | 10 +++ 4 files changed, 96 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/19c8fb02/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 8c76a3a..b3a3a1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -114,6 +114,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val JOIN = Keyword(JOIN) protected val LEFT = Keyword(LEFT) protected val LIMIT = Keyword(LIMIT) + protected val MAX = Keyword(MAX) + protected val MIN = Keyword(MIN) protected val NOT = Keyword(NOT) protected val NULL = Keyword(NULL) protected val ON = Keyword(ON) @@ -318,6 +320,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { COUNT ~ ( ~ DISTINCT ~ expression ~ ) ^^ { case exp = CountDistinct(exp :: Nil) } | FIRST ~ ( ~ expression ~ ) ^^ { case exp = First(exp) } | AVG ~ ( ~ expression ~ ) ^^ { case exp = Average(exp) } | +MIN ~ ( ~ expression ~ ) ^^ { case exp = Min(exp) } | +MAX ~ ( ~ expression ~ ) ^^ { case exp = Max(exp) } | IF ~ ( ~ expression ~ , ~ expression ~ , ~ expression ~ ) ^^ { case c ~ , ~ t ~ , ~ f = If(c,t,f) } | http://git-wip-us.apache.org/repos/asf/spark/blob/19c8fb02/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index b152f95..d37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -86,6 +86,67 @@ abstract class AggregateFunction override def newInstance() = makeCopy(productIterator.map { case a: AnyRef = a }.toArray) } +case class Min(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { + override def references = child.references + override def nullable = child.nullable + override def dataType = child.dataType + override def toString = sMIN($child) + + override def asPartial: SplitEvaluation = { +val partialMin = Alias(Min(child), PartialMin)() +SplitEvaluation(Min(partialMin.toAttribute), partialMin :: Nil) + } + + override def newInstance() = new MinFunction(child, this) +} + +case class MinFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + var currentMin: Any = _ + + override def update(input: Row): Unit = { +if (currentMin == null) { + currentMin = expr.eval(input) +} else if(GreaterThan(Literal(currentMin, expr.dataType), expr).eval(input) == true) { + currentMin = expr.eval(input) +} + } + + override def eval(input: Row): Any = currentMin +} + +case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { + override def references = child.references + override def nullable = child.nullable + override def dataType = child.dataType + override def toString = sMAX($child) + + override def
git commit: MLlib documentation fix
Repository: spark Updated Branches: refs/heads/master 322b1808d - d38febee4 MLlib documentation fix Fixed the documentation for that `loadLibSVMData` is changed to `loadLibSVMFile`. Author: DB Tsai dbt...@alpinenow.com Closes #703 from dbtsai/dbtsai-docfix and squashes the following commits: 71dd508 [DB Tsai] loadLibSVMData is changed to loadLibSVMFile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d38febee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d38febee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d38febee Branch: refs/heads/master Commit: d38febee46ed156b0c8ec64757db6c290e488421 Parents: 322b180 Author: DB Tsai dbt...@alpinenow.com Authored: Thu May 8 17:52:32 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Thu May 8 17:52:32 2014 -0700 -- docs/mllib-basics.md | 8 docs/mllib-linear-methods.md | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d38febee/docs/mllib-basics.md -- diff --git a/docs/mllib-basics.md b/docs/mllib-basics.md index 7043088..aa9321a 100644 --- a/docs/mllib-basics.md +++ b/docs/mllib-basics.md @@ -184,7 +184,7 @@ After loading, the feature indices are converted to zero-based. div class=codetabs div data-lang=scala markdown=1 -[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training +[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training examples stored in LIBSVM format. {% highlight scala %} @@ -192,12 +192,12 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -val training: RDD[LabeledPoint] = MLUtils.loadLibSVMData(sc, mllib/data/sample_libsvm_data.txt) +val training: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt) {% endhighlight %} /div div data-lang=java markdown=1 -[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training +[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training examples stored in LIBSVM format. {% highlight java %} @@ -205,7 +205,7 @@ import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.rdd.RDDimport; -RDDLabeledPoint training = MLUtils.loadLibSVMData(jsc, mllib/data/sample_libsvm_data.txt); +RDDLabeledPoint training = MLUtils.loadLibSVMFile(jsc, mllib/data/sample_libsvm_data.txt); {% endhighlight %} /div /div http://git-wip-us.apache.org/repos/asf/spark/blob/d38febee/docs/mllib-linear-methods.md -- diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 40b7a7f..eff617d 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -186,7 +186,7 @@ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils // Load training data in LIBSVM format. -val data = MLUtils.loadLibSVMData(sc, mllib/data/sample_libsvm_data.txt) +val data = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt) // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
git commit: Add Python includes to path before depickling broadcast values
Repository: spark Updated Branches: refs/heads/branch-1.0 71ad53f81 - 2a669a70d Add Python includes to path before depickling broadcast values This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the Python includes to the PYTHONPATH before depickling the broadcast values @airhorns Author: Bouke van der Bijl boukevanderb...@gmail.com Closes #656 from bouk/python-includes-before-broadcast and squashes the following commits: 7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling broadcast values (cherry picked from commit 3776f2f283842543ff766398292532c6e94221cc) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a669a70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a669a70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a669a70 Branch: refs/heads/branch-1.0 Commit: 2a669a70d09c33480d9db2d15e4a2f69dd214c77 Parents: 71ad53f Author: Bouke van der Bijl boukevanderb...@gmail.com Authored: Sat May 10 13:02:13 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 13:02:23 2014 -0700 -- .../scala/org/apache/spark/api/python/PythonRDD.scala | 10 +- python/pyspark/worker.py | 14 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a669a70/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fecd976..388b838 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -179,6 +179,11 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.writeInt(split.index) // sparkFilesDir PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut) +// Python includes (*.zip and *.egg files) +dataOut.writeInt(pythonIncludes.length) +for (include - pythonIncludes) { + PythonRDD.writeUTF(include, dataOut) +} // Broadcast variables dataOut.writeInt(broadcastVars.length) for (broadcast - broadcastVars) { @@ -186,11 +191,6 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.writeInt(broadcast.value.length) dataOut.write(broadcast.value) } -// Python includes (*.zip and *.egg files) -dataOut.writeInt(pythonIncludes.length) -for (include - pythonIncludes) { - PythonRDD.writeUTF(include, dataOut) -} dataOut.flush() // Serialized command: dataOut.writeInt(command.length) http://git-wip-us.apache.org/repos/asf/spark/blob/2a669a70/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4c214ef..f43210c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -56,13 +56,6 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True -# fetch names and values of broadcast variables -num_broadcast_variables = read_int(infile) -for _ in range(num_broadcast_variables): -bid = read_long(infile) -value = pickleSer._read_with_length(infile) -_broadcastRegistry[bid] = Broadcast(bid, value) - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) @@ -70,6 +63,13 @@ def main(infile, outfile): filename = utf8_deserializer.loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) +# fetch names and values of broadcast variables +num_broadcast_variables = read_int(infile) +for _ in range(num_broadcast_variables): +bid = read_long(infile) +value = pickleSer._read_with_length(infile) +_broadcastRegistry[bid] = Broadcast(bid, value) + command = pickleSer._read_with_length(infile) (func, deserializer, serializer) = command init_time = time.time()
git commit: SPARK-1686: keep schedule() calling in the main thread
Repository: spark Updated Branches: refs/heads/branch-1.0 8202276c9 - adf8cdd0b SPARK-1686: keep schedule() calling in the main thread https://issues.apache.org/jira/browse/SPARK-1686 moved from original JIRA (by @markhamstra): In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties. There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread. In this PR, I added a new master message TriggerSchedule to trigger the local call of schedule() in the scheduler thread Author: CodingCat zhunans...@gmail.com Closes #639 from CodingCat/SPARK-1686 and squashes the following commits: 81bb4ca [CodingCat] rename variable 69e0a2a [CodingCat] style fix 36a2ac0 [CodingCat] address Aaron's comments ec9b7bb [CodingCat] address the comments 02b37ca [CodingCat] keep schedule() calling in the main thread (cherry picked from commit 2f452cbaf35dbc609ab48ec0ee5e3dd7b6b9b790) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adf8cdd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adf8cdd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adf8cdd0 Branch: refs/heads/branch-1.0 Commit: adf8cdd0b29731325f08552d050c43fe1bbd724f Parents: 8202276 Author: CodingCat zhunans...@gmail.com Authored: Fri May 9 21:50:23 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri May 9 21:52:40 2014 -0700 -- .../org/apache/spark/deploy/master/Master.scala | 15 --- 1 file changed, 12 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/adf8cdd0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633b..f254f55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -104,6 +104,8 @@ private[spark] class Master( var leaderElectionAgent: ActorRef = _ + private var recoveryCompletionTask: Cancellable = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. @@ -152,6 +154,10 @@ private[spark] class Master( } override def postStop() { +// prevent the CompleteRecovery message sending to restarted master +if (recoveryCompletionTask != null) { + recoveryCompletionTask.cancel() +} webUi.stop() fileSystemsUsed.foreach(_.close()) masterMetricsSystem.stop() @@ -171,10 +177,13 @@ private[spark] class Master( logInfo(I have been elected leader! New state: + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) -context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } +recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, + CompleteRecovery) } } +case CompleteRecovery = completeRecovery() + case RevokedLeadership = { logError(Leadership has been revoked -- master shutting down.) System.exit(0) @@ -465,7 +474,7 @@ private[spark] class Master( * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ - def schedule() { + private def schedule() { if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications @@ -485,7 +494,7 @@ private[spark] class Master( // Try to spread out each app among all the nodes, until it has all its cores for (app - waitingApps if app.coresLeft 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app,
git commit: Add Python includes to path before depickling broadcast values
Repository: spark Updated Branches: refs/heads/master c05d11bb3 - 3776f2f28 Add Python includes to path before depickling broadcast values This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the Python includes to the PYTHONPATH before depickling the broadcast values @airhorns Author: Bouke van der Bijl boukevanderb...@gmail.com Closes #656 from bouk/python-includes-before-broadcast and squashes the following commits: 7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling broadcast values Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3776f2f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3776f2f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3776f2f2 Branch: refs/heads/master Commit: 3776f2f283842543ff766398292532c6e94221cc Parents: c05d11b Author: Bouke van der Bijl boukevanderb...@gmail.com Authored: Sat May 10 13:02:13 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 13:02:13 2014 -0700 -- .../scala/org/apache/spark/api/python/PythonRDD.scala | 10 +- python/pyspark/worker.py | 14 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fecd976..388b838 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -179,6 +179,11 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.writeInt(split.index) // sparkFilesDir PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut) +// Python includes (*.zip and *.egg files) +dataOut.writeInt(pythonIncludes.length) +for (include - pythonIncludes) { + PythonRDD.writeUTF(include, dataOut) +} // Broadcast variables dataOut.writeInt(broadcastVars.length) for (broadcast - broadcastVars) { @@ -186,11 +191,6 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.writeInt(broadcast.value.length) dataOut.write(broadcast.value) } -// Python includes (*.zip and *.egg files) -dataOut.writeInt(pythonIncludes.length) -for (include - pythonIncludes) { - PythonRDD.writeUTF(include, dataOut) -} dataOut.flush() // Serialized command: dataOut.writeInt(command.length) http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4c214ef..f43210c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -56,13 +56,6 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True -# fetch names and values of broadcast variables -num_broadcast_variables = read_int(infile) -for _ in range(num_broadcast_variables): -bid = read_long(infile) -value = pickleSer._read_with_length(infile) -_broadcastRegistry[bid] = Broadcast(bid, value) - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) @@ -70,6 +63,13 @@ def main(infile, outfile): filename = utf8_deserializer.loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) +# fetch names and values of broadcast variables +num_broadcast_variables = read_int(infile) +for _ in range(num_broadcast_variables): +bid = read_long(infile) +value = pickleSer._read_with_length(infile) +_broadcastRegistry[bid] = Broadcast(bid, value) + command = pickleSer._read_with_length(infile) (func, deserializer, serializer) = command init_time = time.time()
git commit: [SPARK-1778] [SQL] Add 'limit' transformation to SchemaRDD.
Repository: spark Updated Branches: refs/heads/branch-1.0 a61b71cad - 7486474d6 [SPARK-1778] [SQL] Add 'limit' transformation to SchemaRDD. Add `limit` transformation to `SchemaRDD`. Author: Takuya UESHIN ues...@happy-camper.st Closes #711 from ueshin/issues/SPARK-1778 and squashes the following commits: 33169df [Takuya UESHIN] Add 'limit' transformation to SchemaRDD. (cherry picked from commit 8e94d2721a9d3d36697e13f8cc6567ae8aeee78b) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7486474d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7486474d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7486474d Branch: refs/heads/branch-1.0 Commit: 7486474d6b9809cd5a6664939343351db4da0e18 Parents: a61b71c Author: Takuya UESHIN ues...@happy-camper.st Authored: Sat May 10 12:03:27 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 12:03:44 2014 -0700 -- .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 9 + .../src/test/scala/org/apache/spark/sql/DslQuerySuite.scala | 6 ++ 2 files changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7486474d/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 34200be..2569815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -179,6 +179,15 @@ class SchemaRDD( new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan)) /** + * Limits the results by the given expressions. + * {{{ + * schemaRDD.limit(10) + * }}} + */ + def limit(limitExpr: Expression): SchemaRDD = +new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan)) + + /** * Performs a grouping followed by an aggregation. * * {{{ http://git-wip-us.apache.org/repos/asf/spark/blob/7486474d/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index be0f4a4..92a707e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -71,6 +71,12 @@ class DslQuerySuite extends QueryTest { Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2))) } + test(limit) { +checkAnswer( + testData.limit(10), + testData.take(10).toSeq) + } + test(average) { checkAnswer( testData2.groupBy()(Average('a)),
git commit: [SPARK-1690] Tolerating empty elements when saving Python RDD to text files
Repository: spark Updated Branches: refs/heads/master 3776f2f28 - 6c2691d0a [SPARK-1690] Tolerating empty elements when saving Python RDD to text files Tolerate empty strings in PythonRDD Author: Kan Zhang kzh...@apache.org Closes #644 from kanzhang/SPARK-1690 and squashes the following commits: c62ad33 [Kan Zhang] Adding Python doctest 473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python RDD to text files Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c2691d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c2691d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c2691d0 Branch: refs/heads/master Commit: 6c2691d0a0ed46a8b8093e05a4708706cf187168 Parents: 3776f2f Author: Kan Zhang kzh...@apache.org Authored: Sat May 10 14:01:08 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 14:01:08 2014 -0700 -- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 5 +++-- python/pyspark/rdd.py| 8 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c2691d0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 388b838..2971c27 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag]( val obj = new Array[Byte](length) stream.readFully(obj) obj +case 0 = Array.empty[Byte] case SpecialLengths.TIMING_DATA = // Timing data from worker val bootTime = stream.readLong() @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag]( stream.readFully(update) accumulator += Collections.singletonList(update) } - Array.empty[Byte] + null } } catch { @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag]( var _nextObj = read() - def hasNext = _nextObj.length != 0 + def hasNext = _nextObj != null } new InterruptibleIterator(context, stdoutIterator) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c2691d0/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3a1c56a..4f74824 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -891,6 +891,14 @@ class RDD(object): from glob import glob ''.join(sorted(input(glob(tempFile.name + /part-* '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' + +Empty lines are tolerated when saving to text files. + + tempFile2 = NamedTemporaryFile(delete=True) + tempFile2.close() + sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) + ''.join(sorted(input(glob(tempFile2.name + /part-* +'\\n\\n\\nbar\\nfoo\\n' def func(split, iterator): for x in iterator:
git commit: fix broken in link in python docs
Repository: spark Updated Branches: refs/heads/branch-1.0 9fbb22c20 - 71ad53f81 fix broken in link in python docs Author: Andy Konwinski andykonwin...@gmail.com Closes #650 from andyk/python-docs-link-fix and squashes the following commits: a1f9d51 [Andy Konwinski] fix broken in link in python docs (cherry picked from commit c05d11bb307eaba40c5669da2d374c28debaa55a) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71ad53f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71ad53f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71ad53f8 Branch: refs/heads/branch-1.0 Commit: 71ad53f8176fd9411edf04188a3fe0264777a781 Parents: 9fbb22c Author: Andy Konwinski andykonwin...@gmail.com Authored: Sat May 10 12:46:51 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 12:47:18 2014 -0700 -- docs/python-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71ad53f8/docs/python-programming-guide.md -- diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 6813963..39fb5f0 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -45,7 +45,7 @@ errors = logData.filter(is_error) PySpark will automatically ship these functions to executors, along with any objects that they reference. Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors. -The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors. +The [Standalone Use](#standalone-programs) section describes how to ship code dependencies to executors. In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.
git commit: Revert Enabled incremental build that comes with sbt 0.13.2
Repository: spark Updated Branches: refs/heads/branch-1.0 71ce7eb0e - 758e5439f Revert Enabled incremental build that comes with sbt 0.13.2 This reverts commit 71ce7eb0e5878f0bafd64bdd201ae257a3bfe106. I meant only to merge this into master. It's an experimental build feature. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/758e5439 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/758e5439 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/758e5439 Branch: refs/heads/branch-1.0 Commit: 758e5439f3c4ee1335f1e777d5ca580da5d9556d Parents: 71ce7eb Author: Patrick Wendell pwend...@gmail.com Authored: Sat May 10 21:08:53 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sat May 10 21:08:53 2014 -0700 -- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/758e5439/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 62d9cb1..6ea30d0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -176,7 +176,7 @@ object SparkBuild extends Build { retrievePattern := [type]s/[artifact](-[revision])(-[classifier]).[ext], transitiveClassifiers in Scope.GlobalScope := Seq(sources), testListeners = target.map(t = Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), -incOptions := incOptions.value.withNameHashing(true), + // Fork new JVMs for tests and set Java options for those fork := true, javaOptions in Test += -Dspark.home= + sparkHome,