Repository: spark
Updated Branches:
  refs/heads/master a6cd31108 -> d614967b0


[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically

As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), 
we'd like Python code to automatically be checked for PEP 8 compliance by 
Jenkins. This pull request aims to do that.

Notes:
* We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the 
build server.
* I'm expecting tests to fail now that PEP 8 compliance is being checked as 
part of the build. I'm fine with cleaning up any remaining PEP 8 violations as 
part of this pull request.
* I did not understand why the RAT and scalastyle reports are saved to text 
files. I did the same for the PEP 8 check, but only so that the console output 
style can match those for the RAT and scalastyle checks. The PEP 8 report is 
removed right after the check is complete.
* Updates to the ["Contributing to 
Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark)
 guide will be submitted elsewhere, as I don't believe that text is part of the 
Spark repo.

Author: Nicholas Chammas <nicholas.cham...@gmail.com>
Author: nchammas <nicholas.cham...@gmail.com>

Closes #1744 from nchammas/master and squashes the following commits:

274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
983d963 [nchammas] Merge pull request #5 from apache/master
1db5314 [nchammas] Merge pull request #4 from apache/master
0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
6db9a44 [nchammas] Merge pull request #3 from apache/master
7b4750e [Nicholas Chammas] merge upstream changes
91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
9da347f [nchammas] Merge pull request #2 from apache/master
aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
fe57ed0 [Nicholas Chammas] removing merge conflict backups
9c01d4c [nchammas] Merge pull request #1 from apache/master
9a66cb0 [Nicholas Chammas] resolving merge conflicts
a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
723ed39 [Nicholas Chammas] always delete the report file
0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
75ad552 [Nicholas Chammas] make check output style consistent


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d614967b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d614967b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d614967b

Branch: refs/heads/master
Commit: d614967b0bad1e6c5277d612602ec0a653a00258
Parents: a6cd311
Author: Nicholas Chammas <nicholas.cham...@gmail.com>
Authored: Wed Aug 6 12:58:24 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Aug 6 12:58:24 2014 -0700

----------------------------------------------------------------------
 dev/lint-python                        |  60 ++++++++++++
 dev/lint-scala                         |  23 +++++
 dev/run-tests                          |  13 ++-
 dev/scalastyle                         |   2 +-
 python/pyspark/accumulators.py         |   7 ++
 python/pyspark/broadcast.py            |   1 +
 python/pyspark/conf.py                 |   1 +
 python/pyspark/context.py              |  25 ++---
 python/pyspark/daemon.py               |   5 +-
 python/pyspark/files.py                |   1 +
 python/pyspark/java_gateway.py         |   1 +
 python/pyspark/mllib/_common.py        |   5 +-
 python/pyspark/mllib/classification.py |   8 ++
 python/pyspark/mllib/clustering.py     |   3 +
 python/pyspark/mllib/linalg.py         |   2 +
 python/pyspark/mllib/random.py         |  14 +--
 python/pyspark/mllib/recommendation.py |   2 +
 python/pyspark/mllib/regression.py     |  12 +++
 python/pyspark/mllib/stat.py           |   1 +
 python/pyspark/mllib/tests.py          |  11 ++-
 python/pyspark/mllib/tree.py           |   4 +-
 python/pyspark/mllib/util.py           |   1 +
 python/pyspark/rdd.py                  |  22 +++--
 python/pyspark/rddsampler.py           |   4 +
 python/pyspark/resultiterable.py       |   2 +
 python/pyspark/serializers.py          |  21 +++-
 python/pyspark/shuffle.py              |  20 ++--
 python/pyspark/sql.py                  |  66 +++++++++----
 python/pyspark/storagelevel.py         |   1 +
 python/pyspark/tests.py                | 143 ++++++++++++++++------------
 python/test_support/userlibrary.py     |   2 +
 tox.ini                                |   1 +
 32 files changed, 348 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/dev/lint-python
----------------------------------------------------------------------
diff --git a/dev/lint-python b/dev/lint-python
new file mode 100755
index 0000000..4efddad
--- /dev/null
+++ b/dev/lint-python
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
+SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
+PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt"
+
+cd $SPARK_ROOT_DIR
+
+# Get pep8 at runtime so that we don't rely on it being installed on the build 
server.
+#+ See: https://github.com/apache/spark/pull/1744#issuecomment-50982162
+#+ TODOs:
+#+  - Dynamically determine latest release version of pep8 and use that.
+#+  - Download this from a more reliable source. (GitHub raw can be flaky, 
apparently. (?))
+PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py"
+PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py";
+
+curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH"    
+curl_status=$?
+
+if [ $curl_status -ne 0 ]; then
+    echo "Failed to download pep8.py from \"$PEP8_SCRIPT_REMOTE_PATH\"."
+    exit $curl_status
+fi
+
+
+# There is no need to write this output to a file
+#+ first, but we do so so that the check status can
+#+ be output before the report, like with the
+#+ scalastyle and RAT checks.
+python $PEP8_SCRIPT_PATH ./python > "$PEP8_REPORT_PATH"
+pep8_status=${PIPESTATUS[0]} #$?
+
+if [ $pep8_status -ne 0 ]; then
+    echo "PEP 8 checks failed."
+    cat "$PEP8_REPORT_PATH"
+else
+    echo "PEP 8 checks passed."
+fi
+
+rm -f "$PEP8_REPORT_PATH"
+rm "$PEP8_SCRIPT_PATH"
+
+exit $pep8_status

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/dev/lint-scala
----------------------------------------------------------------------
diff --git a/dev/lint-scala b/dev/lint-scala
new file mode 100755
index 0000000..c676dfd
--- /dev/null
+++ b/dev/lint-scala
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
+SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
+
+"$SCRIPT_DIR/scalastyle"

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index d401c90..0e24515 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -66,16 +66,25 @@ fi
 set -e
 set -o pipefail
 
+echo ""
 echo 
"========================================================================="
 echo "Running Apache RAT checks"
 echo 
"========================================================================="
 dev/check-license
 
+echo ""
 echo 
"========================================================================="
 echo "Running Scala style checks"
 echo 
"========================================================================="
-dev/scalastyle
+dev/lint-scala
 
+echo ""
+echo 
"========================================================================="
+echo "Running Python style checks"
+echo 
"========================================================================="
+dev/lint-python
+
+echo ""
 echo 
"========================================================================="
 echo "Running Spark unit tests"
 echo 
"========================================================================="
@@ -89,11 +98,13 @@ fi
 echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package 
assembly/assembly test | \
   grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
 
+echo ""
 echo 
"========================================================================="
 echo "Running PySpark tests"
 echo 
"========================================================================="
 ./python/run-tests
 
+echo ""
 echo 
"========================================================================="
 echo "Detecting binary incompatibilites with MiMa"
 echo 
"========================================================================="

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index d9f2b91..b53053a 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -30,5 +30,5 @@ if test ! -z "$ERRORS"; then
     echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
     exit 1
 else
-    echo -e "Scalastyle checks passed.\n"
+    echo -e "Scalastyle checks passed."
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/accumulators.py
----------------------------------------------------------------------
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 45d36e5..f133cf6 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param):
 
 
 class Accumulator(object):
+
     """
     A shared variable that can be accumulated, i.e., has a commutative and 
associative "add"
     operation. Worker tasks on a Spark cluster can add values to an 
Accumulator with the C{+=}
@@ -166,6 +167,7 @@ class Accumulator(object):
 
 
 class AccumulatorParam(object):
+
     """
     Helper object that defines how to accumulate values of a given type.
     """
@@ -186,6 +188,7 @@ class AccumulatorParam(object):
 
 
 class AddingAccumulatorParam(AccumulatorParam):
+
     """
     An AccumulatorParam that uses the + operators to add values. Designed for 
simple types
     such as integers, floats, and lists. Requires the zero value for the 
underlying type
@@ -210,6 +213,7 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
 
 
 class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
+
     """
     This handler will keep polling updates from the same socket until the
     server is shutdown.
@@ -228,7 +232,9 @@ class 
_UpdateRequestHandler(SocketServer.StreamRequestHandler):
                 # Write a byte in acknowledgement
                 self.wfile.write(struct.pack("!b", 1))
 
+
 class AccumulatorServer(SocketServer.TCPServer):
+
     """
     A simple TCP server that intercepts shutdown() in order to interrupt
     our continuous polling on the handler.
@@ -239,6 +245,7 @@ class AccumulatorServer(SocketServer.TCPServer):
         self.server_shutdown = True
         SocketServer.TCPServer.shutdown(self)
 
+
 def _start_update_server():
     """Start a TCP server to receive accumulator updates in a daemon thread, 
and returns it"""
     server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/broadcast.py
----------------------------------------------------------------------
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 43f40f8..f3e6498 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -45,6 +45,7 @@ def _from_id(bid):
 
 
 class Broadcast(object):
+
     """
     A broadcast variable created with
     L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/conf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index b4c82f5..fb716f6 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -56,6 +56,7 @@ spark.home=/path
 
 
 class SparkConf(object):
+
     """
     Configuration for a Spark application. Used to set various Spark
     parameters as key-value pairs.

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 2e80eb5..4001eca 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -47,6 +47,7 @@ DEFAULT_CONFIGS = {
 
 
 class SparkContext(object):
+
     """
     Main entry point for Spark functionality. A SparkContext represents the
     connection to a Spark cluster, and can be used to create L{RDD}s and
@@ -213,7 +214,7 @@ class SparkContext(object):
 
             if instance:
                 if (SparkContext._active_spark_context and
-                   SparkContext._active_spark_context != instance):
+                        SparkContext._active_spark_context != instance):
                     currentMaster = SparkContext._active_spark_context.master
                     currentAppName = SparkContext._active_spark_context.appName
                     callsite = SparkContext._active_spark_context._callsite
@@ -406,7 +407,7 @@ class SparkContext(object):
         batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
         ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, 
valueClass,
-                    keyConverter, valueConverter, minSplits, batchSize)
+                                                keyConverter, valueConverter, 
minSplits, batchSize)
         return RDD(jrdd, self, ser)
 
     def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
@@ -437,7 +438,8 @@ class SparkContext(object):
         batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
         ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, 
inputFormatClass, keyClass,
-                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+                                                    valueClass, keyConverter, 
valueConverter,
+                                                    jconf, batchSize)
         return RDD(jrdd, self, ser)
 
     def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
@@ -465,7 +467,8 @@ class SparkContext(object):
         batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
         ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, 
inputFormatClass, keyClass,
-                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+                                                   valueClass, keyConverter, 
valueConverter,
+                                                   jconf, batchSize)
         return RDD(jrdd, self, ser)
 
     def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
@@ -496,7 +499,8 @@ class SparkContext(object):
         batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
         ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, 
inputFormatClass, keyClass,
-                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+                                              valueClass, keyConverter, 
valueConverter,
+                                              jconf, batchSize)
         return RDD(jrdd, self, ser)
 
     def hadoopRDD(self, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
@@ -523,8 +527,9 @@ class SparkContext(object):
         jconf = self._dictToJavaMap(conf)
         batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
         ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
-        jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, 
keyClass, valueClass,
-                    keyConverter, valueConverter, jconf, batchSize)
+        jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, 
keyClass,
+                                             valueClass, keyConverter, 
valueConverter,
+                                             jconf, batchSize)
         return RDD(jrdd, self, ser)
 
     def _checkpointFile(self, name, input_deserializer):
@@ -555,8 +560,7 @@ class SparkContext(object):
         first = rdds[0]._jrdd
         rest = [x._jrdd for x in rdds[1:]]
         rest = ListConverter().convert(rest, self._gateway._gateway_client)
-        return RDD(self._jsc.union(first, rest), self,
-                   rdds[0]._jrdd_deserializer)
+        return RDD(self._jsc.union(first, rest), self, 
rdds[0]._jrdd_deserializer)
 
     def broadcast(self, value):
         """
@@ -568,8 +572,7 @@ class SparkContext(object):
         pickleSer = PickleSerializer()
         pickled = pickleSer.dumps(value)
         jbroadcast = self._jsc.broadcast(bytearray(pickled))
-        return Broadcast(jbroadcast.id(), value, jbroadcast,
-                         self._pickled_broadcast_vars)
+        return Broadcast(jbroadcast.id(), value, jbroadcast, 
self._pickled_broadcast_vars)
 
     def accumulator(self, value, accum_param=None):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index b00da83..e73538b 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -43,7 +43,7 @@ def worker(sock):
     """
     # Redirect stdout to stderr
     os.dup2(2, 1)
-    sys.stdout = sys.stderr   # The sys.stdout object is different from file 
descriptor 1
+    sys.stdout = sys.stderr  # The sys.stdout object is different from file 
descriptor 1
 
     signal.signal(SIGHUP, SIG_DFL)
     signal.signal(SIGCHLD, SIG_DFL)
@@ -134,8 +134,7 @@ def manager():
                 try:
                     os.kill(worker_pid, signal.SIGKILL)
                 except OSError:
-                    pass # process already died
-
+                    pass  # process already died
 
             if listen_sock in ready_fds:
                 sock, addr = listen_sock.accept()

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/files.py
----------------------------------------------------------------------
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 57ee14e..331de9a 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -19,6 +19,7 @@ import os
 
 
 class SparkFiles(object):
+
     """
     Resolves paths to files added through
     L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2c12967..37386ab 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -65,6 +65,7 @@ def launch_gateway():
         # Create a thread to echo output from the GatewayServer, which is 
required
         # for Java log output to show up:
         class EchoOutputThread(Thread):
+
             def __init__(self, stream):
                 Thread.__init__(self)
                 self.daemon = True

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 9c1565a..db341da 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -72,9 +72,9 @@ except:
 # Python interpreter must agree on what endian the machine is.
 
 
-DENSE_VECTOR_MAGIC  = 1
+DENSE_VECTOR_MAGIC = 1
 SPARSE_VECTOR_MAGIC = 2
-DENSE_MATRIX_MAGIC  = 3
+DENSE_MATRIX_MAGIC = 3
 LABELED_POINT_MAGIC = 4
 
 
@@ -443,6 +443,7 @@ def _serialize_rating(r):
 
 
 class RatingDeserializer(Serializer):
+
     def loads(self, stream):
         length = struct.unpack("!i", stream.read(4))[0]
         ba = stream.read(length)

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 5ec1a80..ffdda7e 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -31,6 +31,7 @@ from math import exp, log
 
 
 class LogisticRegressionModel(LinearModel):
+
     """A linear binary classification model derived from logistic regression.
 
     >>> data = [
@@ -60,6 +61,7 @@ class LogisticRegressionModel(LinearModel):
     >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
     True
     """
+
     def predict(self, x):
         _linear_predictor_typecheck(x, self._coeff)
         margin = _dot(x, self._coeff) + self._intercept
@@ -72,6 +74,7 @@ class LogisticRegressionModel(LinearModel):
 
 
 class LogisticRegressionWithSGD(object):
+
     @classmethod
     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
               initialWeights=None, regParam=1.0, regType=None, 
intercept=False):
@@ -108,6 +111,7 @@ class LogisticRegressionWithSGD(object):
 
 
 class SVMModel(LinearModel):
+
     """A support vector machine.
 
     >>> data = [
@@ -131,6 +135,7 @@ class SVMModel(LinearModel):
     >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
     True
     """
+
     def predict(self, x):
         _linear_predictor_typecheck(x, self._coeff)
         margin = _dot(x, self._coeff) + self._intercept
@@ -138,6 +143,7 @@ class SVMModel(LinearModel):
 
 
 class SVMWithSGD(object):
+
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
               miniBatchFraction=1.0, initialWeights=None, regType=None, 
intercept=False):
@@ -173,6 +179,7 @@ class SVMWithSGD(object):
 
 
 class NaiveBayesModel(object):
+
     """
     Model for Naive Bayes classifiers.
 
@@ -213,6 +220,7 @@ class NaiveBayesModel(object):
 
 
 class NaiveBayes(object):
+
     @classmethod
     def train(cls, data, lambda_=1.0):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index b380e8f..a0630d1 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector
 
 
 class KMeansModel(object):
+
     """A clustering model derived from the k-means method.
 
     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
@@ -55,6 +56,7 @@ class KMeansModel(object):
     >>> type(model.clusterCenters)
     <type 'list'>
     """
+
     def __init__(self, centers):
         self.centers = centers
 
@@ -76,6 +78,7 @@ class KMeansModel(object):
 
 
 class KMeans(object):
+
     @classmethod
     def train(cls, data, k, maxIterations=100, runs=1, 
initializationMode="k-means||"):
         """Train a k-means clustering model."""

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 54720c2..9a239ab 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -27,6 +27,7 @@ from numpy import array, array_equal, ndarray, float64, int32
 
 
 class SparseVector(object):
+
     """
     A simple sparse vector class for passing data to MLlib. Users may
     alternatively pass SciPy's {scipy.sparse} data types.
@@ -192,6 +193,7 @@ class SparseVector(object):
 
 
 class Vectors(object):
+
     """
     Factory methods for working with vectors. Note that dense vectors
     are simply represented as NumPy array objects, so there is no need

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/random.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 36e710d..eb49668 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -24,7 +24,9 @@ from pyspark.rdd import RDD
 from pyspark.mllib._common import _deserialize_double, 
_deserialize_double_vector
 from pyspark.serializers import NoOpSerializer
 
+
 class RandomRDDGenerators:
+
     """
     Generator methods for creating RDDs comprised of i.i.d samples from
     some distribution.
@@ -53,7 +55,7 @@ class RandomRDDGenerators:
         True
         """
         jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, 
numPartitions, seed)
-        uniform =  RDD(jrdd, sc, NoOpSerializer())
+        uniform = RDD(jrdd, sc, NoOpSerializer())
         return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
 
     @staticmethod
@@ -77,7 +79,7 @@ class RandomRDDGenerators:
         True
         """
         jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, 
numPartitions, seed)
-        normal =  RDD(jrdd, sc, NoOpSerializer())
+        normal = RDD(jrdd, sc, NoOpSerializer())
         return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
 
     @staticmethod
@@ -98,7 +100,7 @@ class RandomRDDGenerators:
         True
         """
         jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, 
numPartitions, seed)
-        poisson =  RDD(jrdd, sc, NoOpSerializer())
+        poisson = RDD(jrdd, sc, NoOpSerializer())
         return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
 
     @staticmethod
@@ -118,7 +120,7 @@ class RandomRDDGenerators:
         """
         jrdd = sc._jvm.PythonMLLibAPI() \
             .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
-        uniform =  RDD(jrdd, sc, NoOpSerializer())
+        uniform = RDD(jrdd, sc, NoOpSerializer())
         return uniform.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
     @staticmethod
@@ -138,7 +140,7 @@ class RandomRDDGenerators:
         """
         jrdd = sc._jvm.PythonMLLibAPI() \
             .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
-        normal =  RDD(jrdd, sc, NoOpSerializer())
+        normal = RDD(jrdd, sc, NoOpSerializer())
         return normal.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
     @staticmethod
@@ -161,7 +163,7 @@ class RandomRDDGenerators:
         """
         jrdd = sc._jvm.PythonMLLibAPI() \
             .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, 
seed)
-        poisson =  RDD(jrdd, sc, NoOpSerializer())
+        poisson = RDD(jrdd, sc, NoOpSerializer())
         return poisson.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py 
b/python/pyspark/mllib/recommendation.py
index 6c38504..e863fc2 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -26,6 +26,7 @@ from pyspark.rdd import RDD
 
 
 class MatrixFactorizationModel(object):
+
     """A matrix factorisation model trained by regularized alternating
     least-squares.
 
@@ -58,6 +59,7 @@ class MatrixFactorizationModel(object):
 
 
 class ALS(object):
+
     @classmethod
     def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
         sc = ratings.context

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py 
b/python/pyspark/mllib/regression.py
index 041b119..d8792cf 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -27,6 +27,7 @@ from pyspark.mllib.linalg import SparseVector, Vectors
 
 
 class LabeledPoint(object):
+
     """
     The features and labels of a data point.
 
@@ -34,6 +35,7 @@ class LabeledPoint(object):
     @param features: Vector of features for this point (NumPy array, list,
         pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
     """
+
     def __init__(self, label, features):
         self.label = label
         if (type(features) == ndarray or type(features) == SparseVector
@@ -49,7 +51,9 @@ class LabeledPoint(object):
 
 
 class LinearModel(object):
+
     """A linear model that has a vector of coefficients and an intercept."""
+
     def __init__(self, weights, intercept):
         self._coeff = weights
         self._intercept = intercept
@@ -64,6 +68,7 @@ class LinearModel(object):
 
 
 class LinearRegressionModelBase(LinearModel):
+
     """A linear regression model.
 
     >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
@@ -72,6 +77,7 @@ class LinearRegressionModelBase(LinearModel):
     >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 
1e-6
     True
     """
+
     def predict(self, x):
         """Predict the value of the dependent variable given a vector x"""
         """containing values for the independent variables."""
@@ -80,6 +86,7 @@ class LinearRegressionModelBase(LinearModel):
 
 
 class LinearRegressionModel(LinearRegressionModelBase):
+
     """A linear regression model derived from a least-squares fit.
 
     >>> from pyspark.mllib.regression import LabeledPoint
@@ -111,6 +118,7 @@ class LinearRegressionModel(LinearRegressionModelBase):
 
 
 class LinearRegressionWithSGD(object):
+
     @classmethod
     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
               initialWeights=None, regParam=1.0, regType=None, 
intercept=False):
@@ -146,6 +154,7 @@ class LinearRegressionWithSGD(object):
 
 
 class LassoModel(LinearRegressionModelBase):
+
     """A linear regression model derived from a least-squares fit with an
     l_1 penalty term.
 
@@ -178,6 +187,7 @@ class LassoModel(LinearRegressionModelBase):
 
 
 class LassoWithSGD(object):
+
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
               miniBatchFraction=1.0, initialWeights=None):
@@ -189,6 +199,7 @@ class LassoWithSGD(object):
 
 
 class RidgeRegressionModel(LinearRegressionModelBase):
+
     """A linear regression model derived from a least-squares fit with an
     l_2 penalty term.
 
@@ -221,6 +232,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
 
 
 class RidgeRegressionWithSGD(object):
+
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
               miniBatchFraction=1.0, initialWeights=None):

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 0a08a56..982906b 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -24,6 +24,7 @@ from pyspark.mllib._common import \
     _serialize_double, _serialize_double_vector, \
     _deserialize_double, _deserialize_double_matrix
 
+
 class Statistics(object):
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 9d1e5be..6f3ec8a 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -39,6 +39,7 @@ except:
 
 
 class VectorTests(unittest.TestCase):
+
     def test_serialize(self):
         sv = SparseVector(4, {1: 1, 3: 2})
         dv = array([1., 2., 3., 4.])
@@ -81,6 +82,7 @@ class VectorTests(unittest.TestCase):
 
 
 class ListTests(PySparkTestCase):
+
     """
     Test MLlib algorithms on plain lists, to make sure they're passed through
     as NumPy arrays.
@@ -128,7 +130,7 @@ class ListTests(PySparkTestCase):
         self.assertTrue(nb_model.predict(features[2]) <= 0)
         self.assertTrue(nb_model.predict(features[3]) > 0)
 
-        categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
+        categoricalFeaturesInfo = {0: 3}  # feature 0 has 3 categories
         dt_model = \
             DecisionTree.trainClassifier(rdd, numClasses=2,
                                          
categoricalFeaturesInfo=categoricalFeaturesInfo)
@@ -168,7 +170,7 @@ class ListTests(PySparkTestCase):
         self.assertTrue(rr_model.predict(features[2]) <= 0)
         self.assertTrue(rr_model.predict(features[3]) > 0)
 
-        categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
+        categoricalFeaturesInfo = {0: 2}  # feature 0 has 2 categories
         dt_model = \
             DecisionTree.trainRegressor(rdd, 
categoricalFeaturesInfo=categoricalFeaturesInfo)
         self.assertTrue(dt_model.predict(features[0]) <= 0)
@@ -179,6 +181,7 @@ class ListTests(PySparkTestCase):
 
 @unittest.skipIf(not _have_scipy, "SciPy not installed")
 class SciPyTests(PySparkTestCase):
+
     """
     Test both vector operations and MLlib algorithms with SciPy sparse 
matrices,
     if SciPy is available.
@@ -276,7 +279,7 @@ class SciPyTests(PySparkTestCase):
         self.assertTrue(nb_model.predict(features[2]) <= 0)
         self.assertTrue(nb_model.predict(features[3]) > 0)
 
-        categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
+        categoricalFeaturesInfo = {0: 3}  # feature 0 has 3 categories
         dt_model = DecisionTree.trainClassifier(rdd, numClasses=2,
                                                 
categoricalFeaturesInfo=categoricalFeaturesInfo)
         self.assertTrue(dt_model.predict(features[0]) <= 0)
@@ -315,7 +318,7 @@ class SciPyTests(PySparkTestCase):
         self.assertTrue(rr_model.predict(features[2]) <= 0)
         self.assertTrue(rr_model.predict(features[3]) > 0)
 
-        categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
+        categoricalFeaturesInfo = {0: 2}  # feature 0 has 2 categories
         dt_model = DecisionTree.trainRegressor(rdd, 
categoricalFeaturesInfo=categoricalFeaturesInfo)
         self.assertTrue(dt_model.predict(features[0]) <= 0)
         self.assertTrue(dt_model.predict(features[1]) > 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 1e0006d..2518001 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -25,7 +25,9 @@ from pyspark.mllib._common import \
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.serializers import NoOpSerializer
 
+
 class DecisionTreeModel(object):
+
     """
     A decision tree model for classification or regression.
 
@@ -77,6 +79,7 @@ class DecisionTreeModel(object):
 
 
 class DecisionTree(object):
+
     """
     Learning algorithm for a decision tree model
     for classification or regression.
@@ -174,7 +177,6 @@ class DecisionTree(object):
                                   categoricalFeaturesInfo,
                                   impurity, maxDepth, maxBins)
 
-
     @staticmethod
     def train(data, algo, numClasses, categoricalFeaturesInfo,
               impurity, maxDepth, maxBins=100):

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 639cda6..4962d05 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -26,6 +26,7 @@ from pyspark.serializers import NoOpSerializer
 
 
 class MLUtils:
+
     """
     Helper methods to load, save and pre-process data used in MLlib.
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 309f5a9..30b834d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -233,7 +233,7 @@ class RDD(object):
 
     def _toPickleSerialization(self):
         if (self._jrdd_deserializer == PickleSerializer() or
-            self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+                self._jrdd_deserializer == 
BatchedSerializer(PickleSerializer())):
             return self
         else:
             return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
@@ -1079,7 +1079,9 @@ class RDD(object):
         pickledRDD = self._toPickleSerialization()
         batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
         self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, 
batched, path,
-            outputFormatClass, keyClass, valueClass, keyConverter, 
valueConverter, jconf)
+                                                       outputFormatClass,
+                                                       keyClass, valueClass,
+                                                       keyConverter, 
valueConverter, jconf)
 
     def saveAsHadoopDataset(self, conf, keyConverter=None, 
valueConverter=None):
         """
@@ -1125,8 +1127,10 @@ class RDD(object):
         pickledRDD = self._toPickleSerialization()
         batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
         self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, 
path,
-            outputFormatClass, keyClass, valueClass, keyConverter, 
valueConverter,
-            jconf, compressionCodecClass)
+                                                 outputFormatClass,
+                                                 keyClass, valueClass,
+                                                 keyConverter, valueConverter,
+                                                 jconf, compressionCodecClass)
 
     def saveAsSequenceFile(self, path, compressionCodecClass=None):
         """
@@ -1348,7 +1352,7 @@ class RDD(object):
         outputSerializer = self.ctx._unbatched_serializer
 
         limit = (_parse_memory(self.ctx._conf.get(
-                    "spark.python.worker.memory", "512m")) / 2)
+            "spark.python.worker.memory", "512m")) / 2)
 
         def add_shuffle_key(split, iterator):
 
@@ -1430,12 +1434,12 @@ class RDD(object):
         spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
                  == 'true')
         memory = _parse_memory(self.ctx._conf.get(
-                    "spark.python.worker.memory", "512m"))
+            "spark.python.worker.memory", "512m"))
         agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
 
         def combineLocally(iterator):
             merger = ExternalMerger(agg, memory * 0.9, serializer) \
-                         if spill else InMemoryMerger(agg)
+                if spill else InMemoryMerger(agg)
             merger.mergeValues(iterator)
             return merger.iteritems()
 
@@ -1444,7 +1448,7 @@ class RDD(object):
 
         def _mergeCombiners(iterator):
             merger = ExternalMerger(agg, memory, serializer) \
-                         if spill else InMemoryMerger(agg)
+                if spill else InMemoryMerger(agg)
             merger.mergeCombiners(iterator)
             return merger.iteritems()
 
@@ -1588,7 +1592,7 @@ class RDD(object):
         """
         for fraction in fractions.values():
             assert fraction >= 0.0, "Negative fraction value: %s" % fraction
-        return self.mapPartitionsWithIndex( \
+        return self.mapPartitionsWithIndex(
             RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
 
     def subtractByKey(self, other, numPartitions=None):

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/rddsampler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 2df000f..55e247d 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -20,6 +20,7 @@ import random
 
 
 class RDDSamplerBase(object):
+
     def __init__(self, withReplacement, seed=None):
         try:
             import numpy
@@ -95,6 +96,7 @@ class RDDSamplerBase(object):
 
 
 class RDDSampler(RDDSamplerBase):
+
     def __init__(self, withReplacement, fraction, seed=None):
         RDDSamplerBase.__init__(self, withReplacement, seed)
         self._fraction = fraction
@@ -113,7 +115,9 @@ class RDDSampler(RDDSamplerBase):
                 if self.getUniformSample(split) <= self._fraction:
                     yield obj
 
+
 class RDDStratifiedSampler(RDDSamplerBase):
+
     def __init__(self, withReplacement, fractions, seed=None):
         RDDSamplerBase.__init__(self, withReplacement, seed)
         self._fractions = fractions

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/resultiterable.py
----------------------------------------------------------------------
diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py
index df34740..ef04c82 100644
--- a/python/pyspark/resultiterable.py
+++ b/python/pyspark/resultiterable.py
@@ -21,9 +21,11 @@ import collections
 
 
 class ResultIterable(collections.Iterable):
+
     """
     A special result iterable. This is used because the standard iterator can 
not be pickled
     """
+
     def __init__(self, data):
         self.data = data
         self.index = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index a10f85b..b35558d 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -111,6 +111,7 @@ class Serializer(object):
 
 
 class FramedSerializer(Serializer):
+
     """
     Serializer that writes objects as a stream of (length, data) pairs,
     where C{length} is a 32-bit integer and data is C{length} bytes.
@@ -162,6 +163,7 @@ class FramedSerializer(Serializer):
 
 
 class BatchedSerializer(Serializer):
+
     """
     Serializes a stream of objects in batches by calling its wrapped
     Serializer with streams of objects.
@@ -207,6 +209,7 @@ class BatchedSerializer(Serializer):
 
 
 class CartesianDeserializer(FramedSerializer):
+
     """
     Deserializes the JavaRDD cartesian() of two PythonRDDs.
     """
@@ -240,6 +243,7 @@ class CartesianDeserializer(FramedSerializer):
 
 
 class PairDeserializer(CartesianDeserializer):
+
     """
     Deserializes the JavaRDD zip() of two PythonRDDs.
     """
@@ -289,6 +293,7 @@ def _hack_namedtuple(cls):
     """ Make class generated by namedtuple picklable """
     name = cls.__name__
     fields = cls._fields
+
     def __reduce__(self):
         return (_restore, (name, fields, tuple(self)))
     cls.__reduce__ = __reduce__
@@ -301,10 +306,11 @@ def _hijack_namedtuple():
     if hasattr(collections.namedtuple, "__hijack"):
         return
 
-    global _old_namedtuple # or it will put in closure
+    global _old_namedtuple  # or it will put in closure
+
     def _copy_func(f):
         return types.FunctionType(f.func_code, f.func_globals, f.func_name,
-                f.func_defaults, f.func_closure)
+                                  f.func_defaults, f.func_closure)
 
     _old_namedtuple = _copy_func(collections.namedtuple)
 
@@ -323,15 +329,16 @@ def _hijack_namedtuple():
     # so only hack those in __main__ module
     for n, o in sys.modules["__main__"].__dict__.iteritems():
         if (type(o) is type and o.__base__ is tuple
-            and hasattr(o, "_fields")
-            and "__reduce__" not in o.__dict__):
-            _hack_namedtuple(o) # hack inplace
+                and hasattr(o, "_fields")
+                and "__reduce__" not in o.__dict__):
+            _hack_namedtuple(o)  # hack inplace
 
 
 _hijack_namedtuple()
 
 
 class PickleSerializer(FramedSerializer):
+
     """
     Serializes objects using Python's cPickle serializer:
 
@@ -354,6 +361,7 @@ class CloudPickleSerializer(PickleSerializer):
 
 
 class MarshalSerializer(FramedSerializer):
+
     """
     Serializes objects using Python's Marshal serializer:
 
@@ -367,9 +375,11 @@ class MarshalSerializer(FramedSerializer):
 
 
 class AutoSerializer(FramedSerializer):
+
     """
     Choose marshal or cPickle as serialization protocol autumatically
     """
+
     def __init__(self):
         FramedSerializer.__init__(self)
         self._type = None
@@ -394,6 +404,7 @@ class AutoSerializer(FramedSerializer):
 
 
 class UTF8Deserializer(Serializer):
+
     """
     Deserializes streams written by String.getBytes.
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index e3923d1..2c68cd4 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -45,7 +45,7 @@ except ImportError:
                     return int(line.split()[1]) >> 10
         else:
             warnings.warn("Please install psutil to have better "
-                    "support with spilling")
+                          "support with spilling")
             if platform.system() == "Darwin":
                 import resource
                 rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
@@ -141,7 +141,7 @@ class ExternalMerger(Merger):
 
     This class works as follows:
 
-    - It repeatedly combine the items and save them in one dict in 
+    - It repeatedly combine the items and save them in one dict in
       memory.
 
     - When the used memory goes above memory limit, it will split
@@ -190,12 +190,12 @@ class ExternalMerger(Merger):
     MAX_TOTAL_PARTITIONS = 4096
 
     def __init__(self, aggregator, memory_limit=512, serializer=None,
-            localdirs=None, scale=1, partitions=59, batch=1000):
+                 localdirs=None, scale=1, partitions=59, batch=1000):
         Merger.__init__(self, aggregator)
         self.memory_limit = memory_limit
         # default serializer is only used for tests
         self.serializer = serializer or \
-                BatchedSerializer(PickleSerializer(), 1024)
+            BatchedSerializer(PickleSerializer(), 1024)
         self.localdirs = localdirs or self._get_dirs()
         # number of partitions when spill data into disks
         self.partitions = partitions
@@ -341,7 +341,7 @@ class ExternalMerger(Merger):
                 self.pdata[i].clear()
 
         self.spills += 1
-        gc.collect() # release the memory as much as possible
+        gc.collect()  # release the memory as much as possible
 
     def iteritems(self):
         """ Return all merged items as iterator """
@@ -370,8 +370,8 @@ class ExternalMerger(Merger):
                     if (self.scale * self.partitions < 
self.MAX_TOTAL_PARTITIONS
                             and j < self.spills - 1
                             and get_used_memory() > hard_limit):
-                        self.data.clear() # will read from disk again
-                        gc.collect() # release the memory as much as possible
+                        self.data.clear()  # will read from disk again
+                        gc.collect()  # release the memory as much as possible
                         for v in self._recursive_merged_items(i):
                             yield v
                         return
@@ -409,9 +409,9 @@ class ExternalMerger(Merger):
 
         for i in range(start, self.partitions):
             subdirs = [os.path.join(d, "parts", str(i))
-                            for d in self.localdirs]
+                       for d in self.localdirs]
             m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
-                    subdirs, self.scale * self.partitions)
+                               subdirs, self.scale * self.partitions)
             m.pdata = [{} for _ in range(self.partitions)]
             limit = self._next_limit()
 
@@ -419,7 +419,7 @@ class ExternalMerger(Merger):
                 path = self._get_spill_dir(j)
                 p = os.path.join(path, str(i))
                 m._partitioned_mergeCombiners(
-                        self.serializer.load_stream(open(p)))
+                    self.serializer.load_stream(open(p)))
 
                 if get_used_memory() > limit:
                     m._spill()

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index adc56e7..950e275 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -45,6 +45,7 @@ __all__ = [
 
 
 class DataType(object):
+
     """Spark SQL DataType"""
 
     def __repr__(self):
@@ -62,6 +63,7 @@ class DataType(object):
 
 
 class PrimitiveTypeSingleton(type):
+
     """Metaclass for PrimitiveType"""
 
     _instances = {}
@@ -73,6 +75,7 @@ class PrimitiveTypeSingleton(type):
 
 
 class PrimitiveType(DataType):
+
     """Spark SQL PrimitiveType"""
 
     __metaclass__ = PrimitiveTypeSingleton
@@ -83,6 +86,7 @@ class PrimitiveType(DataType):
 
 
 class StringType(PrimitiveType):
+
     """Spark SQL StringType
 
     The data type representing string values.
@@ -90,6 +94,7 @@ class StringType(PrimitiveType):
 
 
 class BinaryType(PrimitiveType):
+
     """Spark SQL BinaryType
 
     The data type representing bytearray values.
@@ -97,6 +102,7 @@ class BinaryType(PrimitiveType):
 
 
 class BooleanType(PrimitiveType):
+
     """Spark SQL BooleanType
 
     The data type representing bool values.
@@ -104,6 +110,7 @@ class BooleanType(PrimitiveType):
 
 
 class TimestampType(PrimitiveType):
+
     """Spark SQL TimestampType
 
     The data type representing datetime.datetime values.
@@ -111,6 +118,7 @@ class TimestampType(PrimitiveType):
 
 
 class DecimalType(PrimitiveType):
+
     """Spark SQL DecimalType
 
     The data type representing decimal.Decimal values.
@@ -118,6 +126,7 @@ class DecimalType(PrimitiveType):
 
 
 class DoubleType(PrimitiveType):
+
     """Spark SQL DoubleType
 
     The data type representing float values.
@@ -125,6 +134,7 @@ class DoubleType(PrimitiveType):
 
 
 class FloatType(PrimitiveType):
+
     """Spark SQL FloatType
 
     The data type representing single precision floating-point values.
@@ -132,6 +142,7 @@ class FloatType(PrimitiveType):
 
 
 class ByteType(PrimitiveType):
+
     """Spark SQL ByteType
 
     The data type representing int values with 1 singed byte.
@@ -139,6 +150,7 @@ class ByteType(PrimitiveType):
 
 
 class IntegerType(PrimitiveType):
+
     """Spark SQL IntegerType
 
     The data type representing int values.
@@ -146,6 +158,7 @@ class IntegerType(PrimitiveType):
 
 
 class LongType(PrimitiveType):
+
     """Spark SQL LongType
 
     The data type representing long values. If the any value is
@@ -155,6 +168,7 @@ class LongType(PrimitiveType):
 
 
 class ShortType(PrimitiveType):
+
     """Spark SQL ShortType
 
     The data type representing int values with 2 signed bytes.
@@ -162,6 +176,7 @@ class ShortType(PrimitiveType):
 
 
 class ArrayType(DataType):
+
     """Spark SQL ArrayType
 
     The data type representing list values. An ArrayType object
@@ -187,10 +202,11 @@ class ArrayType(DataType):
 
     def __str__(self):
         return "ArrayType(%s,%s)" % (self.elementType,
-               str(self.containsNull).lower())
+                                     str(self.containsNull).lower())
 
 
 class MapType(DataType):
+
     """Spark SQL MapType
 
     The data type representing dict values. A MapType object comprises
@@ -226,10 +242,11 @@ class MapType(DataType):
 
     def __repr__(self):
         return "MapType(%s,%s,%s)" % (self.keyType, self.valueType,
-               str(self.valueContainsNull).lower())
+                                      str(self.valueContainsNull).lower())
 
 
 class StructField(DataType):
+
     """Spark SQL StructField
 
     Represents a field in a StructType.
@@ -263,10 +280,11 @@ class StructField(DataType):
 
     def __repr__(self):
         return "StructField(%s,%s,%s)" % (self.name, self.dataType,
-               str(self.nullable).lower())
+                                          str(self.nullable).lower())
 
 
 class StructType(DataType):
+
     """Spark SQL StructType
 
     The data type representing rows.
@@ -291,7 +309,7 @@ class StructType(DataType):
 
     def __repr__(self):
         return ("StructType(List(%s))" %
-                    ",".join(str(field) for field in self.fields))
+                ",".join(str(field) for field in self.fields))
 
 
 def _parse_datatype_list(datatype_list_string):
@@ -319,7 +337,7 @@ def _parse_datatype_list(datatype_list_string):
 
 
 _all_primitive_types = dict((k, v) for k, v in globals().iteritems()
-    if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType)
+                            if type(v) is PrimitiveTypeSingleton and 
v.__base__ == PrimitiveType)
 
 
 def _parse_datatype_string(datatype_string):
@@ -459,16 +477,16 @@ def _infer_schema(row):
         items = sorted(row.items())
 
     elif isinstance(row, tuple):
-        if hasattr(row, "_fields"): # namedtuple
+        if hasattr(row, "_fields"):  # namedtuple
             items = zip(row._fields, tuple(row))
-        elif hasattr(row, "__FIELDS__"): # Row
+        elif hasattr(row, "__FIELDS__"):  # Row
             items = zip(row.__FIELDS__, tuple(row))
         elif all(isinstance(x, tuple) and len(x) == 2 for x in row):
             items = row
         else:
             raise ValueError("Can't infer schema from tuple")
 
-    elif hasattr(row, "__dict__"): # object
+    elif hasattr(row, "__dict__"):  # object
         items = sorted(row.__dict__.items())
 
     else:
@@ -499,7 +517,7 @@ def _create_converter(obj, dataType):
         conv = lambda o: tuple(o.get(n) for n in names)
 
     elif isinstance(obj, tuple):
-        if hasattr(obj, "_fields"): # namedtuple
+        if hasattr(obj, "_fields"):  # namedtuple
             conv = tuple
         elif hasattr(obj, "__FIELDS__"):
             conv = tuple
@@ -508,7 +526,7 @@ def _create_converter(obj, dataType):
         else:
             raise ValueError("unexpected tuple")
 
-    elif hasattr(obj, "__dict__"): # object
+    elif hasattr(obj, "__dict__"):  # object
         conv = lambda o: [o.__dict__.get(n, None) for n in names]
 
     nested = any(_has_struct(f.dataType) for f in dataType.fields)
@@ -660,7 +678,7 @@ def _infer_schema_type(obj, dataType):
         assert len(fs) == len(obj), \
             "Obj(%s) have different length with fields(%s)" % (obj, fs)
         fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True)
-                    for o, f in zip(obj, fs)]
+                  for o, f in zip(obj, fs)]
         return StructType(fields)
 
     else:
@@ -683,6 +701,7 @@ _acceptable_types = {
     StructType: (tuple, list),
 }
 
+
 def _verify_type(obj, dataType):
     """
     Verify the type of obj against dataType, raise an exception if
@@ -728,7 +747,7 @@ def _verify_type(obj, dataType):
     elif isinstance(dataType, StructType):
         if len(obj) != len(dataType.fields):
             raise ValueError("Length of object (%d) does not match with"
-                "length of fields (%d)" % (len(obj), len(dataType.fields)))
+                             "length of fields (%d)" % (len(obj), 
len(dataType.fields)))
         for v, f in zip(obj, dataType.fields):
             _verify_type(v, f.dataType)
 
@@ -861,6 +880,7 @@ def _create_cls(dataType):
         raise Exception("unexpected data type: %s" % dataType)
 
     class Row(tuple):
+
         """ Row in SchemaRDD """
         __DATATYPE__ = dataType
         __FIELDS__ = tuple(f.name for f in dataType.fields)
@@ -872,7 +892,7 @@ def _create_cls(dataType):
         def __repr__(self):
             # call collect __repr__ for nested objects
             return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n))
-                    for n in self.__FIELDS__))
+                                          for n in self.__FIELDS__))
 
         def __reduce__(self):
             return (_restore_object, (self.__DATATYPE__, tuple(self)))
@@ -881,6 +901,7 @@ def _create_cls(dataType):
 
 
 class SQLContext:
+
     """Main entry point for SparkSQL functionality.
 
     A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as
@@ -960,7 +981,7 @@ class SQLContext:
         env = MapConverter().convert(self._sc.environment,
                                      self._sc._gateway._gateway_client)
         includes = ListConverter().convert(self._sc._python_includes,
-                                     self._sc._gateway._gateway_client)
+                                           self._sc._gateway._gateway_client)
         self._ssql_ctx.registerPython(name,
                                       
bytearray(CloudPickleSerializer().dumps(command)),
                                       env,
@@ -1012,7 +1033,7 @@ class SQLContext:
         first = rdd.first()
         if not first:
             raise ValueError("The first row in RDD is empty, "
-                    "can not infer schema")
+                             "can not infer schema")
         if type(first) is dict:
             warnings.warn("Using RDD of dict to inferSchema is deprecated")
 
@@ -1287,6 +1308,7 @@ class SQLContext:
 
 
 class HiveContext(SQLContext):
+
     """A variant of Spark SQL that integrates with data stored in Hive.
 
     Configuration for Hive is read from hive-site.xml on the classpath.
@@ -1327,6 +1349,7 @@ class HiveContext(SQLContext):
 
 
 class LocalHiveContext(HiveContext):
+
     """Starts up an instance of hive where metadata is stored locally.
 
     An in-process metadata data is created with data stored in ./metadata.
@@ -1357,7 +1380,7 @@ class LocalHiveContext(HiveContext):
     def __init__(self, sparkContext, sqlContext=None):
         HiveContext.__init__(self, sparkContext, sqlContext)
         warnings.warn("LocalHiveContext is deprecated. "
-                "Use HiveContext instead.", DeprecationWarning)
+                      "Use HiveContext instead.", DeprecationWarning)
 
     def _get_hive_ctx(self):
         return self._jvm.LocalHiveContext(self._jsc.sc())
@@ -1376,6 +1399,7 @@ def _create_row(fields, values):
 
 
 class Row(tuple):
+
     """
     A row in L{SchemaRDD}. The fields in it can be accessed like attributes.
 
@@ -1417,7 +1441,6 @@ class Row(tuple):
         else:
             raise ValueError("No args or kwargs")
 
-
     # let obect acs like class
     def __call__(self, *args):
         """create new Row object"""
@@ -1443,12 +1466,13 @@ class Row(tuple):
     def __repr__(self):
         if hasattr(self, "__FIELDS__"):
             return "Row(%s)" % ", ".join("%s=%r" % (k, v)
-                for k, v in zip(self.__FIELDS__, self))
+                                         for k, v in zip(self.__FIELDS__, 
self))
         else:
             return "<Row(%s)>" % ", ".join(self)
 
 
 class SchemaRDD(RDD):
+
     """An RDD of L{Row} objects that has an associated schema.
 
     The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can
@@ -1659,7 +1683,7 @@ class SchemaRDD(RDD):
                 rdd = self._jschema_rdd.subtract(other._jschema_rdd)
             else:
                 rdd = self._jschema_rdd.subtract(other._jschema_rdd,
-                        numPartitions)
+                                                 numPartitions)
             return SchemaRDD(rdd, self.sql_ctx)
         else:
             raise ValueError("Can only subtract another SchemaRDD")
@@ -1686,9 +1710,9 @@ def _test():
     jsonStrings = [
         '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
         '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
-            '"field6":[{"field7": "row2"}]}',
+        '"field6":[{"field7": "row2"}]}',
         '{"field1" : null, "field2": "row3", '
-            '"field3":{"field4":33, "field5": []}}'
+        '"field3":{"field4":33, "field5": []}}'
     ]
     globs['jsonStrings'] = jsonStrings
     globs['json'] = sc.parallelize(jsonStrings)

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 5d77a13..2aa0fb9 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -19,6 +19,7 @@ __all__ = ["StorageLevel"]
 
 
 class StorageLevel:
+
     """
     Flags for controlling the storage of an RDD. Each StorageLevel records 
whether to use memory,
     whether to drop the RDD to disk if it falls out of memory, whether to keep 
the data in memory

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 4ac94ba..88a6117 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,53 +62,53 @@ class TestMerger(unittest.TestCase):
         self.N = 1 << 16
         self.l = [i for i in xrange(self.N)]
         self.data = zip(self.l, self.l)
-        self.agg = Aggregator(lambda x: [x], 
-                lambda x, y: x.append(y) or x,
-                lambda x, y: x.extend(y) or x)
+        self.agg = Aggregator(lambda x: [x],
+                              lambda x, y: x.append(y) or x,
+                              lambda x, y: x.extend(y) or x)
 
     def test_in_memory(self):
         m = InMemoryMerger(self.agg)
         m.mergeValues(self.data)
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)))
+                         sum(xrange(self.N)))
 
         m = InMemoryMerger(self.agg)
         m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)))
+                         sum(xrange(self.N)))
 
     def test_small_dataset(self):
         m = ExternalMerger(self.agg, 1000)
         m.mergeValues(self.data)
         self.assertEqual(m.spills, 0)
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)))
+                         sum(xrange(self.N)))
 
         m = ExternalMerger(self.agg, 1000)
         m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
         self.assertEqual(m.spills, 0)
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)))
+                         sum(xrange(self.N)))
 
     def test_medium_dataset(self):
         m = ExternalMerger(self.agg, 10)
         m.mergeValues(self.data)
         self.assertTrue(m.spills >= 1)
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)))
+                         sum(xrange(self.N)))
 
         m = ExternalMerger(self.agg, 10)
         m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3))
         self.assertTrue(m.spills >= 1)
         self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
-                sum(xrange(self.N)) * 3)
+                         sum(xrange(self.N)) * 3)
 
     def test_huge_dataset(self):
         m = ExternalMerger(self.agg, 10)
         m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10))
         self.assertTrue(m.spills >= 1)
         self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)),
-                self.N * 10)
+                         self.N * 10)
         m._cleanup()
 
 
@@ -188,6 +188,7 @@ class TestAddFile(PySparkTestCase):
         log4j = self.sc._jvm.org.apache.log4j
         old_level = log4j.LogManager.getRootLogger().getLevel()
         log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
+
         def func(x):
             from userlibrary import UserClass
             return UserClass().hello()
@@ -355,8 +356,8 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(doubles, ed)
 
         bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
-                                              
"org.apache.hadoop.io.IntWritable",
-                                              
"org.apache.hadoop.io.BytesWritable").collect())
+                                            "org.apache.hadoop.io.IntWritable",
+                                            
"org.apache.hadoop.io.BytesWritable").collect())
         ebs = [(1, bytearray('aa', 'utf-8')),
                (1, bytearray('aa', 'utf-8')),
                (2, bytearray('aa', 'utf-8')),
@@ -428,9 +429,9 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(clazz[0], ec)
 
         unbatched_clazz = sorted(self.sc.sequenceFile(basepath + 
"/sftestdata/sfclass/",
-                                            "org.apache.hadoop.io.Text",
-                                            
"org.apache.spark.api.python.TestWritable",
-                                            batchSize=1).collect())
+                                                      
"org.apache.hadoop.io.Text",
+                                                      
"org.apache.spark.api.python.TestWritable",
+                                                      batchSize=1).collect())
         self.assertEqual(unbatched_clazz[0], ec)
 
     def test_oldhadoop(self):
@@ -443,7 +444,7 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        oldconf = {"mapred.input.dir" : hellopath}
+        oldconf = {"mapred.input.dir": hellopath}
         hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
                                   "org.apache.hadoop.io.LongWritable",
                                   "org.apache.hadoop.io.Text",
@@ -462,7 +463,7 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        newconf = {"mapred.input.dir" : hellopath}
+        newconf = {"mapred.input.dir": hellopath}
         hello = 
self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                                         "org.apache.hadoop.io.LongWritable",
                                         "org.apache.hadoop.io.Text",
@@ -517,6 +518,7 @@ class TestInputFormat(PySparkTestCase):
               (u'\x03', [2.0])]
         self.assertEqual(maps, em)
 
+
 class TestOutputFormat(PySparkTestCase):
 
     def setUp(self):
@@ -574,8 +576,8 @@ class TestOutputFormat(PySparkTestCase):
     def test_oldhadoop(self):
         basepath = self.tempdir.name
         dict_data = [(1, {}),
-                     (1, {"row1" : 1.0}),
-                     (2, {"row2" : 2.0})]
+                     (1, {"row1": 1.0}),
+                     (2, {"row2": 2.0})]
         self.sc.parallelize(dict_data).saveAsHadoopFile(
             basepath + "/oldhadoop/",
             "org.apache.hadoop.mapred.SequenceFileOutputFormat",
@@ -589,12 +591,13 @@ class TestOutputFormat(PySparkTestCase):
         self.assertEqual(result, dict_data)
 
         conf = {
-            "mapred.output.format.class" : 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
-            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
-            "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
-            "mapred.output.dir" : basepath + "/olddataset/"}
+            "mapred.output.format.class": 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class": "org.apache.hadoop.io.MapWritable",
+            "mapred.output.dir": basepath + "/olddataset/"
+        }
         self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
-        input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+        input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
         old_dataset = sorted(self.sc.hadoopRDD(
             "org.apache.hadoop.mapred.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
@@ -622,14 +625,17 @@ class TestOutputFormat(PySparkTestCase):
             
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
         self.assertEqual(result, array_data)
 
-        conf = {"mapreduce.outputformat.class" :
-                     
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
-                 "mapred.output.key.class" : 
"org.apache.hadoop.io.IntWritable",
-                 "mapred.output.value.class" : 
"org.apache.spark.api.python.DoubleArrayWritable",
-                 "mapred.output.dir" : basepath + "/newdataset/"}
-        self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+        conf = {
+            "mapreduce.outputformat.class":
+                
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class": 
"org.apache.spark.api.python.DoubleArrayWritable",
+            "mapred.output.dir": basepath + "/newdataset/"
+        }
+        self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
+            conf,
             
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
-        input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+        input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
         new_dataset = sorted(self.sc.newAPIHadoopRDD(
             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
@@ -640,7 +646,7 @@ class TestOutputFormat(PySparkTestCase):
 
     def test_newolderror(self):
         basepath = self.tempdir.name
-        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
         self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
             basepath + "/newolderror/saveAsHadoopFile/",
             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
@@ -650,7 +656,7 @@ class TestOutputFormat(PySparkTestCase):
 
     def test_bad_inputs(self):
         basepath = self.tempdir.name
-        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
         self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
             basepath + "/badinputs/saveAsHadoopFile/",
             "org.apache.hadoop.mapred.NotValidOutputFormat"))
@@ -685,30 +691,32 @@ class TestOutputFormat(PySparkTestCase):
         result1 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/sequence").collect())
         self.assertEqual(result1, data)
 
-        rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
-                             
"org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        rdd.saveAsHadoopFile(
+            basepath + "/reserialize/hadoop",
+            "org.apache.hadoop.mapred.SequenceFileOutputFormat")
         result2 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/hadoop").collect())
         self.assertEqual(result2, data)
 
-        rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
-                             
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        rdd.saveAsNewAPIHadoopFile(
+            basepath + "/reserialize/newhadoop",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
         result3 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/newhadoop").collect())
         self.assertEqual(result3, data)
 
         conf4 = {
-            "mapred.output.format.class" : 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
-            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
-            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
-            "mapred.output.dir" : basepath + "/reserialize/dataset"}
+            "mapred.output.format.class": 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
+            "mapred.output.dir": basepath + "/reserialize/dataset"}
         rdd.saveAsHadoopDataset(conf4)
         result4 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/dataset").collect())
         self.assertEqual(result4, data)
 
-        conf5 = {"mapreduce.outputformat.class" :
-                     
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
-            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
-            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
-            "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+        conf5 = {"mapreduce.outputformat.class":
+                 
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+                 "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+                 "mapred.output.value.class": 
"org.apache.hadoop.io.IntWritable",
+                 "mapred.output.dir": basepath + "/reserialize/newdataset"}
         rdd.saveAsNewAPIHadoopDataset(conf5)
         result5 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/newdataset").collect())
         self.assertEqual(result5, data)
@@ -719,25 +727,28 @@ class TestOutputFormat(PySparkTestCase):
         self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
             basepath + "/unbatched/")
 
-        unbatched_sequence = sorted(self.sc.sequenceFile(basepath + 
"/unbatched/",
+        unbatched_sequence = sorted(self.sc.sequenceFile(
+            basepath + "/unbatched/",
             batchSize=1).collect())
         self.assertEqual(unbatched_sequence, ei)
 
-        unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + 
"/unbatched/",
+        unbatched_hadoopFile = sorted(self.sc.hadoopFile(
+            basepath + "/unbatched/",
             "org.apache.hadoop.mapred.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.Text",
             batchSize=1).collect())
         self.assertEqual(unbatched_hadoopFile, ei)
 
-        unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath 
+ "/unbatched/",
+        unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
+            basepath + "/unbatched/",
             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.Text",
             batchSize=1).collect())
         self.assertEqual(unbatched_newAPIHadoopFile, ei)
 
-        oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
         unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
             "org.apache.hadoop.mapred.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
@@ -746,7 +757,7 @@ class TestOutputFormat(PySparkTestCase):
             batchSize=1).collect())
         self.assertEqual(unbatched_hadoopRDD, ei)
 
-        newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        newconf = {"mapred.input.dir": basepath + "/unbatched/"}
         unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
             "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
             "org.apache.hadoop.io.IntWritable",
@@ -763,7 +774,9 @@ class TestOutputFormat(PySparkTestCase):
         self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
             basepath + "/malformed/sequence"))
 
+
 class TestDaemon(unittest.TestCase):
+
     def connect(self, port):
         from socket import socket, AF_INET, SOCK_STREAM
         sock = socket(AF_INET, SOCK_STREAM)
@@ -810,12 +823,15 @@ class TestDaemon(unittest.TestCase):
 
 
 class TestWorker(PySparkTestCase):
+
     def test_cancel_task(self):
         temp = tempfile.NamedTemporaryFile(delete=True)
         temp.close()
         path = temp.name
+
         def sleep(x):
-            import os, time
+            import os
+            import time
             with open(path, 'w') as f:
                 f.write("%d %d" % (os.getppid(), os.getpid()))
             time.sleep(100)
@@ -845,7 +861,7 @@ class TestWorker(PySparkTestCase):
                 os.kill(worker_pid, 0)
                 time.sleep(0.1)
             except OSError:
-                break # worker was killed
+                break  # worker was killed
         else:
             self.fail("worker has not been killed after 5 seconds")
 
@@ -855,12 +871,13 @@ class TestWorker(PySparkTestCase):
             self.fail("daemon had been killed")
 
     def test_fd_leak(self):
-        N = 1100 # fd limit is 1024 by default
+        N = 1100  # fd limit is 1024 by default
         rdd = self.sc.parallelize(range(N), N)
         self.assertEquals(N, rdd.count())
 
 
 class TestSparkSubmit(unittest.TestCase):
+
     def setUp(self):
         self.programDir = tempfile.mkdtemp()
         self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", 
"spark-submit")
@@ -953,9 +970,9 @@ class TestSparkSubmit(unittest.TestCase):
             |def myfunc(x):
             |    return x + 1
             """)
-        proc = subprocess.Popen(
-            [self.sparkSubmit, "--py-files", zip, "--master", 
"local-cluster[1,1,512]", script],
-            stdout=subprocess.PIPE)
+        proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, 
"--master",
+                                "local-cluster[1,1,512]", script],
+                                stdout=subprocess.PIPE)
         out, err = proc.communicate()
         self.assertEqual(0, proc.returncode)
         self.assertIn("[2, 3, 4]", out)
@@ -981,6 +998,7 @@ class TestSparkSubmit(unittest.TestCase):
 
 @unittest.skipIf(not _have_scipy, "SciPy not installed")
 class SciPyTests(PySparkTestCase):
+
     """General PySpark tests that depend on scipy """
 
     def test_serialize(self):
@@ -993,15 +1011,16 @@ class SciPyTests(PySparkTestCase):
 
 @unittest.skipIf(not _have_numpy, "NumPy not installed")
 class NumPyTests(PySparkTestCase):
+
     """General PySpark tests that depend on numpy """
 
     def test_statcounter_array(self):
-        x = self.sc.parallelize([np.array([1.0,1.0]), np.array([2.0,2.0]), 
np.array([3.0,3.0])])
+        x = self.sc.parallelize([np.array([1.0, 1.0]), np.array([2.0, 2.0]), 
np.array([3.0, 3.0])])
         s = x.stats()
-        self.assertSequenceEqual([2.0,2.0], s.mean().tolist())
-        self.assertSequenceEqual([1.0,1.0], s.min().tolist())
-        self.assertSequenceEqual([3.0,3.0], s.max().tolist())
-        self.assertSequenceEqual([1.0,1.0], s.sampleStdev().tolist())
+        self.assertSequenceEqual([2.0, 2.0], s.mean().tolist())
+        self.assertSequenceEqual([1.0, 1.0], s.min().tolist())
+        self.assertSequenceEqual([3.0, 3.0], s.max().tolist())
+        self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist())
 
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/python/test_support/userlibrary.py
----------------------------------------------------------------------
diff --git a/python/test_support/userlibrary.py 
b/python/test_support/userlibrary.py
index 8e4a629..73fd26e 100755
--- a/python/test_support/userlibrary.py
+++ b/python/test_support/userlibrary.py
@@ -19,6 +19,8 @@
 Used to test shipping of code depenencies with SparkContext.addPyFile().
 """
 
+
 class UserClass(object):
+
     def hello(self):
         return "Hello World!"

http://git-wip-us.apache.org/repos/asf/spark/blob/d614967b/tox.ini
----------------------------------------------------------------------
diff --git a/tox.ini b/tox.ini
index 44766e5..a1fefdd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,3 +15,4 @@
 
 [pep8]
 max-line-length=100
+exclude=cloudpickle.py


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to